diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e94564a1b1..2f8aefcac0 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -197,7 +197,7 @@ typedef bool (*__optr_decode_fn_t)(struct SOperatorInfo* pOperator, struct SAggS struct SOptrBasicInfo* pInfo, char* result, int32_t length); typedef int32_t (*__optr_open_fn_t)(struct SOperatorInfo* pOptr); -typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr, bool* newgroup); +typedef SSDataBlock* (*__optr_fn_t)(struct SOperatorInfo* pOptr); typedef void (*__optr_close_fn_t)(void* param, int32_t num); typedef int32_t (*__optr_get_explain_fn_t)(struct SOperatorInfo* pOptr, void** pOptrExplain); @@ -643,7 +643,7 @@ void initExecTimeWindowInfo(SColumnInfoData* pColData, STimeWindow* pQueryWin void cleanupAggSup(SAggSupporter* pAggSup); void destroyBasicOperatorInfo(void* param, int32_t numOfOutput); -void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, @@ -693,6 +693,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInf SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); + SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo); @@ -713,7 +714,7 @@ void setInputDataBlock(SOperatorInfo* pOperator, SqlFunctionCtx* pCtx, SSDataBlo void finalizeQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput); void copyTsColoum(SSDataBlock* pRes, SqlFunctionCtx* pCtx, int32_t numOfOutput); -STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win); +STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win); bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); diff --git a/source/libs/executor/src/executorMain.c b/source/libs/executor/src/executorMain.c index 516afe5553..7705744694 100644 --- a/source/libs/executor/src/executorMain.c +++ b/source/libs/executor/src/executorMain.c @@ -154,14 +154,12 @@ int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) { qDebug("%s execTask is launched", GET_TASKID(pTaskInfo)); - bool newgroup = false; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC); - int64_t st = 0; - - st = taosGetTimestampUs(); - *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot, &newgroup); + int64_t st = taosGetTimestampUs(); + *pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot); uint64_t el = (taosGetTimestampUs() - st); + pTaskInfo->cost.elapsedTime += el; publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index f9173a50ba..fc23722687 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -942,7 +942,7 @@ int32_t setGroupResultOutputBuf(SOptrBasicInfo* binfo, int32_t numOfCols, char* assert(pResultRow != NULL); setResultRowKey(pResultRow, pData, type); - setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset); + setResultRowInitCtx(pResultRow, pCtx, numOfCols, binfo->rowCellInfoOffset); return TSDB_CODE_SUCCESS; } @@ -2032,20 +2032,9 @@ void finalizeMultiTupleQueryResult(SqlFunctionCtx* pCtx, int32_t numOfOutput, SD } } -STableQueryInfo* createTableQueryInfo(void* buf, bool groupbyColumn, STimeWindow win) { +STableQueryInfo* createTableQueryInfo(void* buf, STimeWindow win) { STableQueryInfo* pTableQueryInfo = buf; pTableQueryInfo->lastKey = win.skey; - - // set more initial size of interval/groupby query - // if (/*QUERY_IS_INTERVAL_QUERY(pQueryAttr) || */groupbyColumn) { - int32_t initialSize = 128; - // int32_t code = initResultRowInfo(&pTableQueryInfo->resInfo, initialSize); - // if (code != TSDB_CODE_SUCCESS) { - // return NULL; - // } - // } else { // in other aggregate query, do not initialize the windowResInfo - // } - return pTableQueryInfo; } @@ -2058,8 +2047,7 @@ void destroyTableQueryInfoImpl(STableQueryInfo* pTableQueryInfo) { // cleanupResultRowInfo(&pTableQueryInfo->resInfo); } -void setResultRowOutputBufInitCtx_rv(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, - int32_t* rowCellInfoOffset) { +void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowCellInfoOffset) { for (int32_t i = 0; i < numOfOutput; ++i) { pCtx[i].resultInfo = getResultCell(pResult, i, rowCellInfoOffset); @@ -2160,7 +2148,7 @@ void doSetTableGroupOutputBuf(SAggOperatorInfo* pAggInfo, int32_t numOfOutput, u } } - setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); } void setExecutionContext(int32_t numOfOutput, uint64_t groupId, SExecTaskInfo* pTaskInfo, SAggOperatorInfo* pAggInfo) { @@ -3202,7 +3190,7 @@ static int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -3221,8 +3209,6 @@ static SSDataBlock* doLoadRemoteData(SOperatorInfo* pOperator, bool* newgroup) { return NULL; } - *newgroup = false; - if (pExchangeInfo->seqLoadData) { return seqLoadRemoteData(pOperator); } else { @@ -3397,8 +3383,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i SSDataBlock* loadNextDataBlock(void* param) { SOperatorInfo* pOperator = (SOperatorInfo*)param; - bool newgroup = false; - return pOperator->fpSet.getNextFn(pOperator, &newgroup); + return pOperator->fpSet.getNextFn(pOperator); } static bool needToMerge(SSDataBlock* pBlock, SArray* groupInfo, char** buf, int32_t rowIndex) { @@ -3569,7 +3554,7 @@ static SSDataBlock* doMerge(SOperatorInfo* pOperator) { return (pInfo->binfo.pRes->info.rows > 0) ? pInfo->binfo.pRes : NULL; } -static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doSortedMerge(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -3716,7 +3701,7 @@ _error: return NULL; } -static SSDataBlock* doSort(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doSort(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -3819,7 +3804,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { bool newgroup = true; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -3869,7 +3854,7 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } -static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* getAggregateResult(SOperatorInfo* pOperator) { SAggOperatorInfo* pAggInfo = pOperator->info; SOptrBasicInfo* pInfo = &pAggInfo->binfo; @@ -4086,7 +4071,7 @@ static int32_t handleLimitOffset(SOperatorInfo* pOperator, SSDataBlock* pBlock) } } -static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { SProjectOperatorInfo* pProjectInfo = pOperator->info; SOptrBasicInfo* pInfo = &pProjectInfo->binfo; @@ -4124,21 +4109,18 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - bool prevVal = *newgroup; - // The downstream exec may change the value of the newgroup, so use a local variable instead. publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { - *newgroup = prevVal; setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); break; } // Return result of the previous group in the firstly. - if (*newgroup) { + if (false) { if (pRes->info.rows > 0) { pProjectInfo->existDataBlock = pBlock; break; @@ -4208,7 +4190,7 @@ static void doHandleRemainBlockFromNewGroup(SFillOperatorInfo* pInfo, SResultInf } } -static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doFill(SOperatorInfo* pOperator) { SFillOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -4220,6 +4202,9 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { return NULL; } + // todo handle different group data interpolation + bool n = false; + bool *newgroup = &n; doHandleRemainBlockFromNewGroup(pInfo, pResultInfo, newgroup, pTaskInfo); if (pResBlock->info.rows > pResultInfo->threshold || (!pInfo->multigroupResult && pResBlock->info.rows > 0)) { return pResBlock; @@ -4228,7 +4213,7 @@ static SSDataBlock* doFill(SOperatorInfo* pOperator, bool* newgroup) { SOperatorInfo* pDownstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream, newgroup); + SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (*newgroup) { @@ -4394,7 +4379,7 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf } STimeWindow win = {0, INT64_MAX}; - createTableQueryInfo(pTableQueryInfo, false, win); + createTableQueryInfo(pTableQueryInfo, win); return pTableQueryInfo; } @@ -5529,7 +5514,7 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SExplainExecInfo return TSDB_CODE_SUCCESS; } -static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { SJoinOperatorInfo* pJoinInfo = pOperator->info; SSDataBlock* pRes = pJoinInfo->pRes; @@ -5539,12 +5524,10 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) int32_t nrows = 0; while (1) { - bool prevVal = *newgroup; - if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { SOperatorInfo* ds1 = pOperator->pDownstream[0]; publishOperatorProfEvent(ds1, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1, newgroup); + pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); publishOperatorProfEvent(ds1, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->leftPos = 0; @@ -5557,7 +5540,7 @@ static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator, bool* newgroup) if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { SOperatorInfo* ds2 = pOperator->pDownstream[1]; publishOperatorProfEvent(ds2, QUERY_PROF_BEFORE_OPERATOR_EXEC); - pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2, newgroup); + pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); publishOperatorProfEvent(ds2, QUERY_PROF_AFTER_OPERATOR_EXEC); pJoinInfo->rightPos = 0; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 00ac925c2d..3ac7706545 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -256,7 +256,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } } -static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -277,7 +277,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -541,7 +541,7 @@ static SSDataBlock* buildPartitionResult(SOperatorInfo* pOperator) { return pInfo->binfo.pRes; } -static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -558,7 +558,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator, bool* newgroup) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index c2a101e012..a66d2a263c 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -257,11 +257,9 @@ static void prepareForDescendingScan(STableScanInfo* pTableScanInfo, SqlFunction pTableScanInfo->cond.order = TSDB_ORDER_DESC; } -static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; - SSDataBlock* pBlock = pTableScanInfo->pResBlock; - *newgroup = false; while (tsdbNextDataBlock(pTableScanInfo->dataReader)) { if (isTaskKilled(pOperator->pTaskInfo)) { @@ -289,7 +287,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator, bool* newgroup) { return NULL; } -static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doTableScan(SOperatorInfo* pOperator) { STableScanInfo* pTableScanInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -298,10 +296,8 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { return NULL; } - *newgroup = false; - while (pTableScanInfo->current < pTableScanInfo->scanInfo.numOfAsc) { - SSDataBlock* p = doTableScanImpl(pOperator, newgroup); + SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { return p; } @@ -334,7 +330,7 @@ static SSDataBlock* doTableScan(SOperatorInfo* pOperator, bool* newgroup) { GET_TASKID(pTaskInfo), pWin->skey, pWin->ekey); while (pTableScanInfo->current < total) { - SSDataBlock* p = doTableScanImpl(pOperator, newgroup); + SSDataBlock* p = doTableScanImpl(pOperator); if (p != NULL) { return p; } @@ -421,13 +417,12 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle) { return pOperator; } -static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doBlockInfoScan(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } STableScanInfo* pTableScanInfo = pOperator->info; - *newgroup = false; STableBlockDistInfo tableBlockDist = {0}; tableBlockDist.numOfTables = 1; // TODO set the correct number of tables @@ -514,7 +509,7 @@ static void doClearBufferedBlocks(SStreamBlockScanInfo* pInfo) { taosArrayClear(pInfo->pBlockLists); } -static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doStreamBlockScan(SOperatorInfo* pOperator) { // NOTE: this operator does never check if current status is done or not SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SStreamBlockScanInfo* pInfo = pOperator->info; @@ -859,7 +854,7 @@ static SSDataBlock* buildSysTableMetaBlock() { return pBlock; } -static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doSysTableScan(SOperatorInfo* pOperator) { // build message and send to mnode to fetch the content of system tables. SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; SSysTableScanInfo* pInfo = pOperator->info; @@ -1191,7 +1186,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSDataBlock* pRe return pOperator; } -static SSDataBlock* doTagScan(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doTagScan(SOperatorInfo* pOperator) { #if 0 SOperatorInfo* pOperator = (SOperatorInfo*) param; if (pOperator->status == OP_EXEC_DONE) { diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 31ec6883d5..04ff1515b0 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -131,7 +131,7 @@ static int32_t setTimeWindowOutputBuf(SResultRowInfo* pResultRowInfo, STimeWindo // set time window for current result pResultRow->win = (*win); *pResult = pResultRow; - setResultRowOutputBufInitCtx_rv(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); + setResultRowInitCtx(pResultRow, pCtx, numOfOutput, rowCellInfoOffset); return TSDB_CODE_SUCCESS; } @@ -763,12 +763,11 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { int32_t order = TSDB_ORDER_ASC; // STimeWindow win = {0}; - bool newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, &newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -883,7 +882,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } -static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -908,7 +907,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -934,7 +933,7 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } -static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; @@ -945,7 +944,7 @@ static SSDataBlock* doBuildIntervalResult(SOperatorInfo* pOperator, bool* newgro SSDataBlock* pBlock = pInfo->binfo.pRes; if (pInfo->execModel == OPTR_EXEC_MODEL_STREAM) { - return pOperator->fpSet.getStreamResFn(pOperator, newgroup); + return pOperator->fpSet.getStreamResFn(pOperator); } else { pTaskInfo->code = pOperator->fpSet._openFn(pOperator); if (pTaskInfo->code != TSDB_CODE_SUCCESS) { @@ -985,7 +984,7 @@ static void finalizeUpdatedResult(int32_t numOfOutput, SDiskbasedBuf* pBuf, SArr } } -static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { SIntervalAggOperatorInfo* pInfo = pOperator->info; int32_t order = TSDB_ORDER_ASC; @@ -1002,14 +1001,13 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator, bool* newgroup } // STimeWindow win = {0}; - *newgroup = false; SOperatorInfo* downstream = pOperator->pDownstream[0]; SArray* pUpdated = NULL; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { @@ -1239,7 +1237,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator pRowSup->numOfRows, NULL, pBlock->info.rows, numOfOutput, TSDB_ORDER_ASC); } -static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -1262,7 +1260,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; @@ -1289,7 +1287,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator, bool* newgroup) return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } -static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { +static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator) { if (pOperator->status == OP_EXEC_DONE) { return NULL; } @@ -1309,7 +1307,7 @@ static SSDataBlock* doAllIntervalAgg(SOperatorInfo* pOperator, bool* newgroup) { while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream, newgroup); + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); publishOperatorProfEvent(downstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { break; diff --git a/source/libs/executor/test/executorTests.cpp b/source/libs/executor/test/executorTests.cpp index de8df7b916..f534cf0917 100644 --- a/source/libs/executor/test/executorTests.cpp +++ b/source/libs/executor/test/executorTests.cpp @@ -55,7 +55,7 @@ typedef struct SDummyInputInfo { SSDataBlock* pBlock; } SDummyInputInfo; -SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) { +SSDataBlock* getDummyBlock(SOperatorInfo* pOperator) { SDummyInputInfo* pInfo = static_cast(pOperator->info); if (pInfo->current >= pInfo->totalPages) { return NULL; @@ -121,7 +121,7 @@ SSDataBlock* getDummyBlock(SOperatorInfo* pOperator, bool* newgroup) { return pBlock; } -SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator, bool* newgroup) { +SSDataBlock* get2ColsDummyBlock(SOperatorInfo* pOperator) { SDummyInputInfo* pInfo = static_cast(pOperator->info); if (pInfo->current >= pInfo->totalPages) { return NULL;