diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 9cf5e0c2bf..7b03d5ca34 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -269,6 +269,7 @@ typedef struct SOperatorInfo { SResultInfo resultInfo; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator + // todo extract struct __optr_open_fn_t _openFn; // DO NOT invoke this function directly __optr_fn_t getNextFn; __optr_fn_t getStreamResFn; // execute the aggregate in the stream model. @@ -433,6 +434,11 @@ typedef struct SAggOperatorInfo { uint32_t groupId; SGroupResInfo groupResInfo; STableQueryInfo *pTableQueryInfo; + + SExprInfo *pScalarExprInfo; + int32_t numOfScalarExpr; // the number of scalar expression before the aggregate function can be applied + SqlFunctionCtx *pScalarCtx; // scalar function requried sql function struct. + int32_t *rowCellInfoOffset; // offset value for each row result cell info } SAggOperatorInfo; typedef struct SProjectOperatorInfo { @@ -586,10 +592,7 @@ typedef struct SJoinOperatorInfo { SSDataBlock *pRight; int32_t rightPos; SColumnInfo rightCol; - SNode *pOnCondition; -// SJoinStatus *status; -// int32_t numOfUpstream; // SRspResultInfo resultInfo; } SJoinOperatorInfo; @@ -613,8 +616,8 @@ SqlFunctionCtx* createSqlFunctionCtx(SExprInfo* pExprInfo, int32_t numOfOutput, SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfCols, int32_t dataLoadFlag, int32_t repeatTime, int32_t reverseTime, SArray* pColMatchInfo, SSDataBlock* pResBlock, SNode* pCondition, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, - SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); +SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, + int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createMultiTableAggOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, SSDataBlock* pResBlock, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo); SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t num, SSDataBlock* pResBlock, SLimit* pLimit, SLimit* pSlimit, SExecTaskInfo* pTaskInfo); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index fbed927dca..4386f361c5 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -439,11 +439,11 @@ SResultRow* getNewResultRow_rv(SDiskbasedBuf* pResultBuf, int64_t tableGroupId, return pResultRow; } -static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t tid, - char* pData, int16_t bytes, bool masterscan, uint64_t tableGroupId, +static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, int64_t uid, + char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup) { bool existInCurrentResusltRowInfo = false; - SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, tableGroupId); + SET_RES_WINDOW_KEY(pSup->keyBuf, pData, bytes, groupId); SResultRowPosition* p1 = (SResultRowPosition*)taosHashGet(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes)); @@ -462,11 +462,10 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR existInCurrentResusltRowInfo = false; // this time window created by other timestamp that does not belongs to current table. assert(pResultRowInfo->curPos == -1); } else if (pResultRowInfo->size == 1) { - // ASSERT(0); SResultRowPosition* p = &pResultRowInfo->pPosition[0]; existInCurrentResusltRowInfo = (p->pageId == p1->pageId && p->offset == p1->offset); } else { // check if current pResultRowInfo contains the existInCurrentResusltRowInfo pResultRow - SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, tid, pResultRowInfo); + SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); int64_t* index = taosHashGet(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes)); if (index != NULL) { // TODO check the scan order for current opened time window @@ -497,13 +496,13 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR prepareResultListBuffer(pResultRowInfo, pTaskInfo->env); if (p1 == NULL) { - pResult = getNewResultRow_rv(pResultBuf, tableGroupId, pSup->resultRowSize); + pResult = getNewResultRow_rv(pResultBuf, groupId, pSup->resultRowSize); initResultRow(pResult); // add a new result set for a new group SResultRowPosition pos = {.pageId = pResult->pageId, .offset = pResult->offset}; taosHashPut(pSup->pResultRowHashTable, pSup->keyBuf, GET_RES_WINDOW_KEY_LEN(bytes), &pos, sizeof(SResultRowPosition)); - SResultRowCell cell = {.groupId = tableGroupId, .pos = pos}; + SResultRowCell cell = {.groupId = groupId, .pos = pos}; taosArrayPush(pSup->pResultRowArrayList, &cell); } else { pResult = getResultRowByPos(pResultBuf, p1); @@ -514,7 +513,7 @@ static SResultRow* doSetResultOutBufByKey_rv(SDiskbasedBuf* pResultBuf, SResultR pResultRowInfo->pPosition[pResultRowInfo->size++] = (SResultRowPosition){.pageId = pResult->pageId, .offset = pResult->offset}; int64_t index = pResultRowInfo->curPos; - SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, tid, pResultRowInfo); + SET_RES_EXT_WINDOW_KEY(pSup->keyBuf, pData, bytes, uid, pResultRowInfo); taosHashPut(pSup->pResultRowListSet, pSup->keyBuf, GET_RES_EXT_WINDOW_KEY_LEN(bytes), &index, POINTER_BYTES); } else { pResult = getResultRowByPos(pResultBuf, p1); @@ -4875,6 +4874,11 @@ static int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { // setTagValue(pOperator, pAggInfo->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // } + // there is an scalar expression that needs to be calculated before apply the group aggregation. + if (pAggInfo->pScalarExprInfo != NULL) { + projectApplyFunctions(pAggInfo->pScalarExprInfo, pBlock, pBlock, pAggInfo->pScalarCtx, pAggInfo->numOfScalarExpr, NULL); + } + // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->pCtx, pBlock, order); doAggregateImpl(pOperator, 0, pInfo->pCtx); @@ -5229,6 +5233,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { return TSDB_CODE_SUCCESS; } + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; STableIntervalOperatorInfo* pInfo = pOperator->info; int32_t order = TSDB_ORDER_ASC; @@ -5248,6 +5253,9 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { // setTagValue(pOperator, pRuntimeEnv->current->pTable, pInfo->pCtx, pOperator->numOfOutput); // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); + STableQueryInfo* pTableQueryInfo = pInfo->pCurrent; + + setIntervalQueryRange(pTableQueryInfo, pBlock->info.window.skey, &pTaskInfo->window); hashIntervalAgg(pOperator, &pInfo->binfo.resultRowInfo, pBlock, 0); #if 0 // test for encode/decode result info @@ -5885,8 +5893,8 @@ static STableQueryInfo* initTableQueryInfo(const STableGroupInfo* pTableGroupInf } SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* pExprInfo, int32_t numOfCols, - SSDataBlock* pResultBlock, SExecTaskInfo* pTaskInfo, - const STableGroupInfo* pTableGroupInfo) { + SSDataBlock* pResultBlock, SExprInfo* pScalarExprInfo, + int32_t numOfScalarExpr, SExecTaskInfo* pTaskInfo, const STableGroupInfo* pTableGroupInfo) { SAggOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SAggOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -5903,6 +5911,9 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SExprInfo* } setFunctionResultOutput(&pInfo->binfo, &pInfo->aggSup, MAIN_SCAN, pTaskInfo); + pInfo->pScalarExprInfo = pScalarExprInfo; + pInfo->numOfScalarExpr = numOfScalarExpr; + pInfo->pScalarCtx = createSqlFunctionCtx(pScalarExprInfo, numOfCols, &pInfo->rowCellInfoOffset); pOperator->name = "TableAggregate"; pOperator->operatorType = QUERY_NODE_PHYSICAL_PLAN_AGG; @@ -6688,8 +6699,9 @@ static SArray* extractPartitionColInfo(SNodeList* pNodeList); SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, uint64_t queryId, uint64_t taskId, STableGroupInfo* pTableGroupInfo) { + int32_t type = nodeType(pPhyNode); + if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { - int32_t type = nodeType(pPhyNode); if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode* ) pPhyNode; @@ -6739,7 +6751,6 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } int32_t num = 0; - int32_t type = nodeType(pPhyNode); size_t size = LIST_LENGTH(pPhyNode->pChildren); SOperatorInfo** ops = taosMemoryCalloc(size, POINTER_BYTES); @@ -6762,17 +6773,17 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - SExprInfo* pScalarExprInfo = NULL; int32_t numOfScalarExpr = 0; + SExprInfo* pScalarExprInfo = NULL; + if (pAggNode->pExprs != NULL) { + pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); + } + if (pAggNode->pGroupKeys != NULL) { SArray* pColList = extractColumnInfo(pAggNode->pGroupKeys); - if (pAggNode->pExprs != NULL) { - pScalarExprInfo = createExprInfo(pAggNode->pExprs, NULL, &numOfScalarExpr); - } - pOptr = createGroupOperatorInfo(ops[0], pExprInfo, num, pResBlock, pColList, pAggNode->node.pConditions, pScalarExprInfo, numOfScalarExpr, pTaskInfo, NULL); } else { - pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pTaskInfo, pTableGroupInfo); + pOptr = createAggregateOperatorInfo(ops[0], pExprInfo, num, pResBlock, pScalarExprInfo, numOfScalarExpr, pTaskInfo, pTableGroupInfo); } } else if (QUERY_NODE_PHYSICAL_PLAN_INTERVAL == type) { SIntervalPhysiNode* pIntervalPhyNode = (SIntervalPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index cd79d719da..d58bcd1162 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -286,7 +286,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator, bool* newgrou // the pDataBlock are always the same one, no need to call this again setInputDataBlock(pOperator, pInfo->binfo.pCtx, pBlock, order); - // there is an scalar expression that needs to be calculated before apply the group aggregation. + // there is an scalar expression that needs to be calculated right before apply the group aggregation. if (pInfo->pScalarExprInfo != NULL) { projectApplyFunctions(pInfo->pScalarExprInfo, pBlock, pBlock, pInfo->pScalarFuncCtx, pInfo->numOfScalarExpr, NULL); } @@ -343,7 +343,6 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SExprInfo* pEx pInfo->numOfScalarExpr = numOfScalarExpr; pInfo->pScalarFuncCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pInfo->binfo.rowCellInfoOffset); - int32_t code = initGroupOptrInfo(&pInfo->pGroupColVals, &pInfo->groupKeyLen, &pInfo->keyBuf, pGroupColList); if (code != TSDB_CODE_SUCCESS) { goto _error;