diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b473cfbb54..031f06adfd 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -655,6 +655,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx SArray* pGroupColList, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SInterval* pInterval, SSDataBlock* pResBlock, int32_t fillType, char* fillVal, bool multigroupResult, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createDistinctOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); SOperatorInfo* createTableBlockInfoScanOperator(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); @@ -674,8 +675,7 @@ SOperatorInfo* createMultiwaySortOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SExp SOperatorInfo* createGlobalAggregateOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* param, SArray* pUdfInfo, bool groupResultMixedUp); -SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput); + SOperatorInfo* createSLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, void* merger, bool multigroupResult); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 8fd83d8a96..845e2cd878 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -6880,7 +6880,7 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator, bool* newgroup) } } - if (pProjectInfo->curOutput + pInfo->pRes->info.rows >= pProjectInfo->limit.limit) { + if (pProjectInfo->limit.limit > 0 && pProjectInfo->curOutput + pInfo->pRes->info.rows >= pProjectInfo->limit.limit) { pInfo->pRes->info.rows = (int32_t)(pProjectInfo->limit.limit - pProjectInfo->curOutput); } @@ -7312,22 +7312,22 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { } SStateWindowOperatorInfo* pWindowInfo = pOperator->info; - SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - } +// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { +// pOperator->status = OP_EXEC_DONE; +// } return pBInfo->pRes; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - int32_t order = pQueryAttr->order.order; - STimeWindow win = pQueryAttr->window; + int32_t order = TSDB_ORDER_ASC; + STimeWindow win = pTaskInfo->window; + SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { publishOperatorProfEvent(downstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); @@ -7337,28 +7337,29 @@ static SSDataBlock* doStateWindowAgg(SOperatorInfo* pOperator, bool* newgroup) { if (pBlock == NULL) { break; } - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order); - if (pWindowInfo->colIndex == -1) { - pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock); - } + +// setInputDataBlock(pOperator, pBInfo->pCtx, pDataBlock, TSDB_ORDER_ASC); +// if (pWindowInfo->colIndex == -1) { +// pWindowInfo->colIndex = getGroupbyColumnIndex(pRuntimeEnv->pQueryAttr->pGroupbyExpr, pBlock); +// } doStateWindowAggImpl(pOperator, pWindowInfo, pBlock); } // restore the value - pQueryAttr->order.order = order; - pQueryAttr->window = win; +// pQueryAttr->order.order = order; +// pQueryAttr->window = win; pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); finalizeQueryResult(pBInfo->pCtx, pOperator->numOfOutput); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); +// initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { - pOperator->status = OP_EXEC_DONE; - } +// if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { +// pOperator->status = OP_EXEC_DONE; +// } return pBInfo->pRes->info.rows == 0 ? NULL : pBInfo->pRes; } @@ -7986,9 +7987,9 @@ SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, S return pOperator; } -SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput) { +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); + pInfo->colIndex = -1; pInfo->reptScan = false; // pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); @@ -7999,13 +8000,14 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->name = "StateWindowOperator"; // pOperator->operatorType = OP_StateWindow; pOperator->blockingOptr = true; - pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doStateWindowAgg; - pOperator->closeFn = destroyStateWindowOperatorInfo; + pOperator->status = OP_NOT_OPENED; + pOperator->pExpr = pExpr; + pOperator->numOfOutput = numOfCols; + + pOperator->pTaskInfo = pTaskInfo; + pOperator->info = pInfo; + pOperator->getNextFn = doStateWindowAgg; + pOperator->closeFn = destroyStateWindowOperatorInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator;