diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index e8a11f8c1c..646477855b 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -521,8 +521,9 @@ typedef struct SProjectOperatorInfo { } SProjectOperatorInfo; typedef struct SLimitOperatorInfo { - int64_t limit; - int64_t total; + SLimit limit; + int64_t currentOffset; + int64_t currentRows; } SLimitOperatorInfo; typedef struct SSLimitOperatorInfo { @@ -565,14 +566,15 @@ typedef struct SGroupbyOperatorInfo { char* prevData; // previous group by value } SGroupbyOperatorInfo; -typedef struct SSWindowOperatorInfo { +typedef struct SSessionAggOperatorInfo { SOptrBasicInfo binfo; + SAggSupporter aggSup; STimeWindow curWindow; // current time window TSKEY prevTs; // previous timestamp int32_t numOfRows; // number of rows int32_t start; // start row index bool reptScan; // next round scan -} SSWindowOperatorInfo; +} SSessionAggOperatorInfo; typedef struct SStateWindowOperatorInfo { SOptrBasicInfo binfo; @@ -647,25 +649,23 @@ typedef struct SOrderOperatorInfo { SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo *createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createOrderOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SArray* pOrderVal, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t numOfDownstream, SExprInfo* pExprInfo, int32_t num, SArray* pOrderVal, SArray* pGroupInfo, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* pSysTableReadHandle, const SArray* pExprInfo, const SSchema* pSchema, int32_t tableType, SEpSet epset, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SInterval* pInterval, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream); - +SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); SOperatorInfo* createAllTimeIntervalOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput); -SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, - int32_t numOfOutput); + SOperatorInfo* createFillOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput, bool multigroupResult); SOperatorInfo* createGroupbyOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 04047c3b7c..6db16193f5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1692,7 +1692,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SGroupbyOperatorInfo *pIn tfree(pInfo->prevData); } -static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSWindowOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { +static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperatorInfo *pInfo, SSDataBlock *pSDataBlock) { STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; STableQueryInfo* item = pRuntimeEnv->current; @@ -5613,7 +5613,7 @@ void setTableScanFilterOperatorInfo(STableScanInfo* pTableScanInfo, SOperatorInf pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; pTableScanInfo->rowCellInfoOffset = pInfo->binfo.rowCellInfoOffset; } else if (pDownstream->operatorType == OP_SessionWindow) { - SSWindowOperatorInfo* pInfo = pDownstream->info; + SSessionAggOperatorInfo* pInfo = pDownstream->info; pTableScanInfo->pCtx = pInfo->binfo.pCtx; pTableScanInfo->pResultRowInfo = &pInfo->binfo.resultRowInfo; @@ -6431,46 +6431,47 @@ static SSDataBlock* doLimit(SOperatorInfo *pOperator, bool* newgroup) { } SLimitOperatorInfo* pInfo = pOperator->info; - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; SSDataBlock* pBlock = NULL; + SOperatorInfo* pDownstream = pOperator->pDownstream[0]; + while (1) { - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_BEFORE_OPERATOR_EXEC); - pBlock = pOperator->pDownstream[0]->getNextFn(pOperator->pDownstream[0], newgroup); - publishOperatorProfEvent(pOperator->pDownstream[0], QUERY_PROF_AFTER_OPERATOR_EXEC); + publishOperatorProfEvent(pDownstream, QUERY_PROF_BEFORE_OPERATOR_EXEC); + pBlock = pDownstream->getNextFn(pDownstream, newgroup); + publishOperatorProfEvent(pDownstream, QUERY_PROF_AFTER_OPERATOR_EXEC); if (pBlock == NULL) { doSetOperatorCompleted(pOperator); return NULL; } - if (pRuntimeEnv->currentOffset == 0) { + if (pInfo->currentOffset == 0) { break; - } else if (pRuntimeEnv->currentOffset >= pBlock->info.rows) { - pRuntimeEnv->currentOffset -= pBlock->info.rows; - } else { - int32_t remain = (int32_t)(pBlock->info.rows - pRuntimeEnv->currentOffset); + } else if (pInfo->currentOffset >= pBlock->info.rows) { + pInfo->currentOffset -= pBlock->info.rows; + } else { // TODO handle the data movement + int32_t remain = (int32_t)(pBlock->info.rows - pInfo->currentOffset); pBlock->info.rows = remain; for (int32_t i = 0; i < pBlock->info.numOfCols; ++i) { SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); int16_t bytes = pColInfoData->info.bytes; - memmove(pColInfoData->pData, pColInfoData->pData + bytes * pRuntimeEnv->currentOffset, remain * bytes); + memmove(pColInfoData->pData, pColInfoData->pData + bytes * pInfo->currentOffset, remain * bytes); } - pRuntimeEnv->currentOffset = 0; + pInfo->currentOffset = 0; break; } } - if (pInfo->total + pBlock->info.rows >= pInfo->limit) { - pBlock->info.rows = (int32_t)(pInfo->limit - pInfo->total); - pInfo->total = pInfo->limit; + if (pInfo->currentRows + pBlock->info.rows >= pInfo->limit.limit) { + pBlock->info.rows = (int32_t)(pInfo->limit.limit - pInfo->currentRows); + pInfo->currentRows = pInfo->limit.limit; doSetOperatorCompleted(pOperator); } else { - pInfo->total += pBlock->info.rows; + pInfo->currentRows += pBlock->info.rows; } return pBlock; @@ -6875,26 +6876,19 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) return NULL; } - SSWindowOperatorInfo* pWindowInfo = pOperator->info; + SSessionAggOperatorInfo* pWindowInfo = pOperator->info; SOptrBasicInfo* pBInfo = &pWindowInfo->binfo; - - STaskRuntimeEnv* pRuntimeEnv = pOperator->pRuntimeEnv; if (pOperator->status == OP_RES_TO_RETURN) { // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { pOperator->status = OP_EXEC_DONE; } return pBInfo->pRes; } - STaskAttr* pQueryAttr = pRuntimeEnv->pQueryAttr; - //pQueryAttr->order.order = TSDB_ORDER_ASC; - int32_t order = pQueryAttr->order.order; - STimeWindow win = pQueryAttr->window; - + int32_t order = TSDB_ORDER_ASC; SOperatorInfo* downstream = pOperator->pDownstream[0]; while(1) { @@ -6906,23 +6900,19 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo *pOperator, bool* newgroup) } // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, pQueryAttr->order.order); + setInputDataBlock(pOperator, pBInfo->pCtx, pBlock, order); doSessionWindowAggImpl(pOperator, pWindowInfo, pBlock); } // restore the value - pQueryAttr->order.order = order; - pQueryAttr->window = win; - pOperator->status = OP_RES_TO_RETURN; closeAllResultRows(&pBInfo->resultRowInfo); // setTaskStatus(pOperator->pTaskInfo, QUERY_COMPLETED); finalizeQueryResult(pOperator, pBInfo->pCtx, &pBInfo->resultRowInfo, pBInfo->rowCellInfoOffset); - initGroupResInfo(&pRuntimeEnv->groupResInfo, &pBInfo->resultRowInfo); +// initGroupResInfo(&pBInfo->groupResInfo, &pBInfo->resultRowInfo); // toSDatablock(&pRuntimeEnv->groupResInfo, pRuntimeEnv, pBInfo->pRes); - - if (pBInfo->pRes->info.rows == 0 || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)) { + if (pBInfo->pRes->info.rows == 0/* || !hasRemainDataInCurrentGroup(&pRuntimeEnv->groupResInfo)*/) { pOperator->status = OP_EXEC_DONE; } @@ -7230,7 +7220,7 @@ static void destroyAggOperatorInfo(void* param, int32_t numOfOutput) { } static void destroySWindowOperatorInfo(void* param, int32_t numOfOutput) { - SSWindowOperatorInfo* pInfo = (SSWindowOperatorInfo*) param; + SSessionAggOperatorInfo* pInfo = (SSessionAggOperatorInfo*) param; doDestroyBasicInfo(&pInfo->binfo, numOfOutput); } @@ -7385,19 +7375,20 @@ SColumnInfo* extractColumnFilterInfo(SExprInfo* pExpr, int32_t numOfOutput, int3 return 0; } -SOperatorInfo* createLimitOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream) { +SOperatorInfo* createLimitOperatorInfo(SOperatorInfo* downstream, SLimit* pLimit, SExecTaskInfo* pTaskInfo) { SLimitOperatorInfo* pInfo = calloc(1, sizeof(SLimitOperatorInfo)); - pInfo->limit = pRuntimeEnv->pQueryAttr->limit.limit; + pInfo->limit = *pLimit; + pInfo->currentOffset = pLimit->offset; SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); pOperator->name = "LimitOperator"; -// pOperator->operatorType = OP_Limit; +// pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_LIMIT; pOperator->blockingOptr = false; pOperator->status = OP_NOT_OPENED; - pOperator->getNextFn = doLimit; + pOperator->getNextFn = doLimit; pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; + pOperator->pTaskInfo = pTaskInfo; int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -7481,14 +7472,15 @@ SOperatorInfo* createStatewindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOper pOperator->getNextFn = doStateWindowAgg; pOperator->closeFn = destroyStateWindowOperatorInfo; - int32_t code = appendDownstream(pOperator, &downstream, 1); + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; } -SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperatorInfo* downstream, SExprInfo* pExpr, int32_t numOfOutput) { - SSWindowOperatorInfo* pInfo = calloc(1, sizeof(SSWindowOperatorInfo)); - pInfo->binfo.pCtx = createSqlFunctionCtx(pRuntimeEnv, pExpr, numOfOutput, &pInfo->binfo.rowCellInfoOffset); - pInfo->binfo.pRes = createOutputBuf(pExpr, numOfOutput, pRuntimeEnv->resultInfo.capacity); +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo) { + SSessionAggOperatorInfo* pInfo = calloc(1, sizeof(SSessionAggOperatorInfo)); + + doInitAggInfoSup(&pInfo->aggSup, pInfo->binfo.pCtx, numOfCols); + pInfo->binfo.pRes = pResBlock; initResultRowInfo(&pInfo->binfo.resultRowInfo, 8); pInfo->prevTs = INT64_MIN; @@ -7499,14 +7491,15 @@ SOperatorInfo* createSWindowOperatorInfo(STaskRuntimeEnv* pRuntimeEnv, SOperator // pOperator->operatorType = OP_SessionWindow; pOperator->blockingOptr = true; pOperator->status = OP_NOT_OPENED; - pOperator->pExpr = pExpr; - pOperator->numOfOutput = numOfOutput; - pOperator->info = pInfo; - pOperator->pRuntimeEnv = pRuntimeEnv; - pOperator->getNextFn = doSessionWindowAgg; - pOperator->closeFn = destroySWindowOperatorInfo; + pOperator->pExpr = pExprInfo; + pOperator->numOfOutput = numOfCols; - int32_t code = appendDownstream(pOperator, &downstream, 1); + pOperator->info = pInfo; + pOperator->getNextFn = doSessionWindowAgg; + pOperator->closeFn = destroySWindowOperatorInfo; + pOperator->pTaskInfo = pTaskInfo; + + int32_t code = appendDownstream(pOperator, &downstream, 1); return pOperator; }