From b0ee829db25f0ee6921c0359ddb4b25e74fbb16a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Nov 2022 00:27:49 +0800 Subject: [PATCH] refactor: do some internal refactor. --- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/inc/executorimpl.h | 159 +--- source/libs/executor/src/cachescanoperator.c | 2 +- source/libs/executor/src/exchangeoperator.c | 2 +- source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorimpl.c | 19 +- source/libs/executor/src/groupoperator.c | 10 +- source/libs/executor/src/joinoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 4 +- source/libs/executor/src/scanoperator.c | 15 +- source/libs/executor/src/sortoperator.c | 8 +- source/libs/executor/src/sysscanoperator.c | 4 +- source/libs/executor/src/tfill.c | 6 +- source/libs/executor/src/timewindowoperator.c | 618 +----------- source/libs/function/src/builtinsimpl.c | 893 ------------------ 15 files changed, 141 insertions(+), 1605 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index fd8a357a8b..d5366f1b7a 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -140,7 +140,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); -SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); +SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b9d622a4c3..bd4472327c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -537,15 +537,6 @@ typedef struct SStreamIntervalOperatorInfo { SWinKey delKey; } SStreamIntervalOperatorInfo; -typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - STableQueryInfo* current; - uint64_t groupId; - SGroupResInfo groupResInfo; - SExprSupp scalarExprSup; -} SAggOperatorInfo; - typedef struct SFillOperatorInfo { struct SFillInfo* pFillInfo; SSDataBlock* pRes; @@ -577,18 +568,6 @@ typedef struct SWindowRowsSup { uint64_t groupId; } SWindowRowsSup; -typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - int32_t tsSlotId; // primary timestamp slot id - STimeWindowAggSupp twAggSup; -} SSessionAggOperatorInfo; - typedef struct SResultWindowInfo { void* pOutputBuf; SSessionKey sessionWin; @@ -681,37 +660,6 @@ typedef struct SStreamFillOperatorInfo { SStreamFillInfo* pFillInfo; } SStreamFillOperatorInfo; -typedef struct STimeSliceOperatorInfo { - SSDataBlock* pRes; - STimeWindow win; - SInterval interval; - int64_t current; - SArray* pPrevRow; // SArray - SArray* pNextRow; // SArray - SArray* pLinearInfo; // SArray - bool isPrevRowSet; - bool isNextRowSet; - int32_t fillType; // fill type - SColumn tsCol; // primary timestamp column - SExprSupp scalarSup; // scalar calculation - struct SFillColInfo* pFillColInfo; // fill column info -} STimeSliceOperatorInfo; - -typedef struct SStateWindowOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SExprSupp scalarSup; - - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - SColumn stateCol; // start row index - bool hasKey; - SStateKeys stateKey; - int32_t tsSlotId; // primary timestamp column slot id - STimeWindowAggSupp twAggSup; -} SStateWindowOperatorInfo; - #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) @@ -726,6 +674,7 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); + int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); @@ -735,12 +684,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); -bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); -void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); +bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); +void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); +void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, @@ -751,7 +700,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); -void doDestroyExchangeOperatorInfo(void* param); +extern void doDestroyExchangeOperatorInfo(void* param); void setOperatorCompleted(SOperatorInfo* pOperator); void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, @@ -764,79 +713,73 @@ void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); -SSDataBlock* loadNextDataBlock(void* param); - void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); - +// operator creater functions +// clang-format off SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, - const char* pUser, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, - SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, bool isStream); -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream); + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); + +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, - SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild); -SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); + +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); +// clang-format on int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 873089023a..cdd744bded 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -59,7 +59,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pInfo->readHandle = *readHandle; SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); int32_t numOfCols = 0; code = diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index d8c85c5ffb..280880c077 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -303,7 +303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode } tsem_init(&pInfo->ready, 0, 0); - pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 65937ae1bc..08e6e4792b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -208,7 +208,7 @@ SArray* createSortInfo(SNodeList* pNodeList) { return pList; } -SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { +SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); SSDataBlock* pBlock = createDataBlock(); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7d8c7da78d..5eaa8ba8dd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -76,6 +76,15 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) +typedef struct SAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + STableQueryInfo* current; + uint64_t groupId; + SGroupResInfo groupResInfo; + SExprSupp scalarExprSup; +} SAggOperatorInfo; + int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); @@ -316,8 +325,8 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus pCtx->input.startRowIndex = pStatus->startOffset; } -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { +void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily SFunctionCtxStatus status = {0}; @@ -2039,7 +2048,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -2213,7 +2222,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); pOperator->exprSupp.pExprInfo = pExprInfo; @@ -2512,7 +2521,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8fd37c3b14..bbf9bd2a27 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -314,7 +314,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); @@ -331,7 +331,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); } } @@ -431,7 +431,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); int32_t numOfScalarExpr = 0; @@ -823,7 +823,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition uint32_t defaultPgsz = 0; uint32_t defaultBufsz = 0; - pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); + pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (!osTempSpaceAvailable()) { @@ -1119,7 +1119,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } pInfo->partitionSup.needCalc = true; - pInfo->binfo.pRes = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc); + pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc); if (pInfo->binfo.pRes == NULL) { goto _error; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index a1b44307d4..3839af9913 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -87,7 +87,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } int32_t numOfCols = 0; - SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index ada7964c67..819997c521 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -85,7 +85,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; @@ -385,7 +385,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy } } - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc); int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7988c555e9..c0bea731bd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -885,7 +885,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired; initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); @@ -2352,7 +2352,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; @@ -2476,8 +2476,7 @@ static void destroyTagScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2499,7 +2498,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi goto _error; } - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; @@ -2613,7 +2612,7 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { return pList; } -int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { +int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); for (int i = 0; i < src->numOfCols; i++) { @@ -2664,7 +2663,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { taosArrayPush(pInfo->sortSourceParams, ¶m); SQueryTableDataCond cond; - dumpSQueryTableCond(&pInfo->base.cond, &cond); + dumpQueryTableCond(&pInfo->base.cond, &cond); taosArrayPush(pInfo->queryConds, &cond); } @@ -2900,7 +2899,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN } initResultSizeInfo(&pOperator->resultInfo, 1024); - pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 02cd0fe696..f2c8dc5083 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -47,7 +47,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pDescNode); SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; @@ -509,7 +509,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); - pInfo->binfo.pRes = createResDataBlock(pDescNode); + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); int32_t numOfOutputCols = 0; @@ -766,7 +766,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size } initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); - pInfo->binfo.pRes = createResDataBlock(pDescNode); + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); int32_t rowSize = pInfo->binfo.pRes->info.rowSize; ASSERT(rowSize < 100 * 1024 * 1024); @@ -779,7 +779,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size } SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); + SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index eea2549a42..7ef2668804 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1411,7 +1411,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->pUser = taosMemoryStrDup((void*)pUser); pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pCondition = pScanNode->node.pConditions; code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); @@ -1928,7 +1928,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi pInfo->readHandle = *readHandle; pInfo->uid = pBlockScanNode->suid; - pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); + pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc); blockDataEnsureCapacity(pInfo->pResBlock, 1); int32_t numOfCols = 0; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 7674b9e479..ba826a23d2 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1651,9 +1651,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi } initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); - pInfo->pSrcBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); - pInfo->pPrevSrcBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pPrevSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 013b8d39de..80c3c1c454 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -22,13 +22,37 @@ #include "tfill.h" #include "ttime.h" +#define IS_FINAL_OP(op) ((op)->isFinal) + +typedef struct SSessionAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + int32_t tsSlotId; // primary timestamp slot id + STimeWindowAggSupp twAggSup; +} SSessionAggOperatorInfo; + +typedef struct SStateWindowOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + SColumn stateCol; // start row index + bool hasKey; + SStateKeys stateKey; + int32_t tsSlotId; // primary timestamp column slot id + STimeWindowAggSupp twAggSup; +} SStateWindowOperatorInfo; + typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; -#define IS_FINAL_OP(op) ((op)->isFinal) - typedef struct SPullWindowInfo { STimeWindow window; uint64_t groupId; @@ -640,7 +664,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { @@ -937,7 +961,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -972,7 +996,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } #endif updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1140,7 +1164,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window @@ -1165,7 +1189,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } @@ -1706,7 +1730,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); SExprSupp* pSup = &pOperator->exprSupp; @@ -1845,7 +1869,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window @@ -1863,7 +1887,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } @@ -1938,552 +1962,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } -static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i); - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pkey->isNull = false; - char* val = colDataGetData(pColInfoData, rowIndex); - if (!IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, pkey->bytes); - } else { - memcpy(pkey->pData, val, varDataLen(val)); - } - } else { - pkey->isNull = true; - } - } - - pSliceInfo->isPrevRowSet = true; -} - -static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i); - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pkey->isNull = false; - char* val = colDataGetData(pColInfoData, rowIndex); - if (!IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, pkey->bytes); - } else { - memcpy(pkey->pData, val, varDataLen(val)); - } - } else { - pkey->isNull = true; - } - } - - pSliceInfo->isNextRowSet = true; -} - -static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i); - - // null value is represented by using key = INT64_MIN for now. - // TODO: optimize to ignore null values for linear interpolation. - if (!pLinearInfo->isStartSet) { - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); - memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); - } - pLinearInfo->isStartSet = true; - } else if (!pLinearInfo->isEndSet) { - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); - memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); - } - pLinearInfo->isEndSet = true; - } else { - pLinearInfo->start.key = pLinearInfo->end.key; - memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes); - - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); - memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); - } else { - pLinearInfo->end.key = INT64_MIN; - } - } - } - -} - -static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, bool beforeTs) { - int32_t rows = pResBlock->info.rows; - blockDataEnsureCapacity(pResBlock, rows + 1); - // todo set the correct primary timestamp column - - // output the result - bool hasInterp = true; - for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { - SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; - - int32_t dstSlot = pExprInfo->base.resSchema.slotId; - SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, rows, (char*)&pSliceInfo->current, false); - continue; - } - - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - switch (pSliceInfo->fillType) { - case TSDB_FILL_NULL: { - colDataAppendNULL(pDst, rows); - break; - } - - case TSDB_FILL_SET_VALUE: { - SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; - - if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); - } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); - } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); - } - break; - } - - case TSDB_FILL_LINEAR: { - SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot); - - SPoint start = pLinearInfo->start; - SPoint end = pLinearInfo->end; - SPoint current = {.key = pSliceInfo->current}; - - // do not interpolate before ts range, only increate pSliceInfo->current - if (beforeTs && !pLinearInfo->isEndSet) { - return true; - } - - if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) { - hasInterp = false; - break; - } - - if (start.key == INT64_MIN || end.key == INT64_MIN) { - colDataAppendNULL(pDst, rows); - break; - } - - current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); - taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); - colDataAppend(pDst, rows, (char*)current.val, false); - - taosMemoryFree(current.val); - break; - } - case TSDB_FILL_PREV: { - if (!pSliceInfo->isPrevRowSet) { - hasInterp = false; - break; - } - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); - if (pkey->isNull == false) { - colDataAppend(pDst, rows, pkey->pData, false); - } else { - colDataAppendNULL(pDst, rows); - } - break; - } - - case TSDB_FILL_NEXT: { - if (!pSliceInfo->isNextRowSet) { - hasInterp = false; - break; - } - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot); - if (pkey->isNull == false) { - colDataAppend(pDst, rows, pkey->pData, false); - } else { - colDataAppendNULL(pDst, rows); - } - break; - } - - case TSDB_FILL_NONE: - default: - break; - } - } - - if (hasInterp) { - pResBlock->info.rows += 1; - } - - return hasInterp; -} - -static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, - SSDataBlock* pSrcBlock, int32_t index) { - blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1); - for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { - SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; - - int32_t dstSlot = pExprInfo->base.resSchema.slotId; - SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); - } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); - - if (colDataIsNull_s(pSrc, index)) { - colDataAppendNULL(pDst, pResBlock->info.rows); - continue; - } - - char* v = colDataGetData(pSrc, index); - colDataAppend(pDst, pResBlock->info.rows, v, false); - } - } - - pResBlock->info.rows += 1; - return; -} - - -static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - if (pInfo->pPrevRow != NULL) { - return TSDB_CODE_SUCCESS; - } - - pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pPrevRow == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys key = {0}; - key.bytes = pColInfo->info.bytes; - key.type = pColInfo->info.type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pColInfo->info.bytes); - taosArrayPush(pInfo->pPrevRow, &key); - } - - pInfo->isPrevRowSet = false; - - return TSDB_CODE_SUCCESS; -} - -static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - if (pInfo->pNextRow != NULL) { - return TSDB_CODE_SUCCESS; - } - - pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pNextRow == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys key = {0}; - key.bytes = pColInfo->info.bytes; - key.type = pColInfo->info.type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pColInfo->info.bytes); - taosArrayPush(pInfo->pNextRow, &key); - } - - pInfo->isNextRowSet = false; - - return TSDB_CODE_SUCCESS; -} - -static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - if (pInfo->pLinearInfo != NULL) { - return TSDB_CODE_SUCCESS; - } - - pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo)); - if (pInfo->pLinearInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - SFillLinearInfo linearInfo = {0}; - linearInfo.start.key = INT64_MIN; - linearInfo.end.key = INT64_MIN; - linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.isStartSet = false; - linearInfo.isEndSet = false; - linearInfo.type = pColInfo->info.type; - linearInfo.bytes = pColInfo->info.bytes; - taosArrayPush(pInfo->pLinearInfo, &linearInfo); - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - int32_t code; - code = initPrevRowsKeeper(pInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } - - code = initNextRowsKeeper(pInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } - - code = initFillLinearInfo(pInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } - - return TSDB_CODE_SUCCESS; -} - -static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - STimeSliceOperatorInfo* pSliceInfo = pOperator->info; - SSDataBlock* pResBlock = pSliceInfo->pRes; - SExprSupp* pSup = &pOperator->exprSupp; - - int32_t order = TSDB_ORDER_ASC; - SInterval* pInterval = &pSliceInfo->interval; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - blockDataCleanup(pResBlock); - - while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - break; - } - - int32_t code = initKeeperInfo(pSliceInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); - - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - - if (ts == pSliceInfo->current) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else if (ts < pSliceInfo->current) { - // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - if (i < pBlock->info.rows - 1) { - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i + 1); - int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); - if (nextTs > pSliceInfo->current) { - while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else { - // ignore current row, and do nothing - } - } else { // it is the last row of current block - doKeepPrevRows(pSliceInfo, pBlock, i); - } - } else { // ts > pSliceInfo->current - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - } - - // add current row if timestamp match - if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - doKeepPrevRows(pSliceInfo, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } - } - } - - // check if need to interpolate after last datablock - // except for fill(next), fill(linear) - while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && - pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - - // restore the value - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - if (pResBlock->info.rows == 0) { - pOperator->status = OP_EXEC_DONE; - } - - return pResBlock->info.rows == 0 ? NULL : pResBlock; -} - -void destroyTimeSliceOperatorInfo(void* param) { - STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; - - pInfo->pRes = blockDataDestroy(pInfo->pRes); - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { - SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); - taosMemoryFree(pKey->pData); - } - taosArrayDestroy(pInfo->pPrevRow); - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) { - SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i); - taosMemoryFree(pKey->pData); - } - taosArrayDestroy(pInfo->pNextRow); - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) { - SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i); - taosMemoryFree(pKey->start.val); - taosMemoryFree(pKey->end.val); - } - taosArrayDestroy(pInfo->pLinearInfo); - - taosMemoryFree(pInfo->pFillColInfo); - taosMemoryFreeClear(param); -} - -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { - STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pOperator == NULL || pInfo == NULL) { - goto _error; - } - - SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode; - SExprSupp* pSup = &pOperator->exprSupp; - - int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); - int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (pInterpPhyNode->pExprs != NULL) { - int32_t num = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); - code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - } - - pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); - pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); - initResultSizeInfo(&pOperator->resultInfo, 4096); - - pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); - pInfo->pLinearInfo = NULL; - pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pInfo->win = pInterpPhyNode->timeRange; - pInfo->interval.interval = pInterpPhyNode->interval; - pInfo->current = pInfo->win.skey; - - if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { - STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; - pScanInfo->base.cond.twindows = pInfo->win; - pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL; - } - - setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, - pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL); - - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - - code = appendDownstream(pOperator, &downstream, 1); - return pOperator; - -_error: - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - return NULL; -} - SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); @@ -2528,7 +2006,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -2588,7 +2066,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -2975,7 +2453,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pSDataBlock->info.rows, numOfOutput); SWinKey key = { .ts = nextWin.skey, @@ -3226,7 +2704,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -3551,7 +3029,7 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo return TSDB_CODE_QRY_OUT_OF_MEMORY; } updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); return TSDB_CODE_SUCCESS; } @@ -4066,7 +3544,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh SExprSupp* pSup = &pOperator->exprSupp; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4586,7 +4064,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SExprSupp* pSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4695,7 +4173,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, pBlock->info.rows, pSup->numOfExprs); finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); @@ -4715,7 +4193,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, pBlock->info.rows, pSup->numOfExprs); } @@ -4873,7 +4351,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc); initBasicInfo(&iaInfo->binfo, pResBlock); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); @@ -5022,7 +4500,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); @@ -5054,7 +4532,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); @@ -5178,7 +4656,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pIntervalInfo->binfo, pResBlock); initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); @@ -5334,7 +4812,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); ASSERT(numOfCols > 0); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SInterval interval = { .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 640baf4f94..e572ce7a40 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -729,375 +729,6 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -//int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); } -// -//bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { -// pEnv->calcMemSize = sizeof(SAvgRes); -// return true; -//} -// -//bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { -// if (!functionSetup(pCtx, pResultInfo)) { -// return false; -// } -// -// SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); -// memset(pRes, 0, sizeof(SAvgRes)); -// return true; -//} - -//int32_t avgFunction(SqlFunctionCtx* pCtx) { -// int32_t numOfElem = 0; -// -// SInputColumnInfoData* pInput = &pCtx->input; -// SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; -// int32_t type = pInput->pData[0]->info.type; -// -// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// pAvgRes->type = type; -// -// // computing based on the true data block -// SColumnInfoData* pCol = pInput->pData[0]; -// -// int32_t start = pInput->startRowIndex; -// int32_t numOfRows = pInput->numOfRows; -// -// if (IS_NULL_TYPE(type)) { -// numOfElem = 0; -// goto _avg_over; -// } -// -// if (pInput->colDataSMAIsSet) { -// numOfElem = numOfRows - pAgg->numOfNull; -// ASSERT(numOfElem >= 0); -// -// pAvgRes->count += numOfElem; -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->sum.isum += pAgg->sum; -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->sum.usum += pAgg->sum; -// } else if (IS_FLOAT_TYPE(type)) { -// pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum)); -// } -// } else { // computing based on the true data block -// switch (type) { -// case TSDB_DATA_TYPE_TINYINT: { -// int8_t* plist = (int8_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_SMALLINT: { -// int16_t* plist = (int16_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_INT: { -// int32_t* plist = (int32_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_BIGINT: { -// int64_t* plist = (int64_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_UTINYINT: { -// uint8_t* plist = (uint8_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_USMALLINT: { -// uint16_t* plist = (uint16_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_UINT: { -// uint32_t* plist = (uint32_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_UBIGINT: { -// uint64_t* plist = (uint64_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_FLOAT: { -// float* plist = (float*)pCol->pData; -//// float val = 0; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.dsum += plist[i]; -// } -//// pAvgRes->sum.dsum = val; -// break; -// } -// -// case TSDB_DATA_TYPE_DOUBLE: { -// double* plist = (double*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.dsum += plist[i]; -// } -// break; -// } -// -// default: -// break; -// } -// } -// -//_avg_over: -// // data in the check operation are all null, not output -// SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); -// return TSDB_CODE_SUCCESS; -//} - -//static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) { -// pOutput->type = pInput->type; -// if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) { -// pOutput->sum.isum += pInput->sum.isum; -// } else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) { -// pOutput->sum.usum += pInput->sum.usum; -// } else { -// pOutput->sum.dsum += pInput->sum.dsum; -// } -// -// pOutput->count += pInput->count; -// -// return; -//} -// -//int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { -// SInputColumnInfoData* pInput = &pCtx->input; -// SColumnInfoData* pCol = pInput->pData[0]; -// ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); -// -// SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// -// int32_t start = pInput->startRowIndex; -// -// for (int32_t i = start; i < start + pInput->numOfRows; ++i) { -// char* data = colDataGetData(pCol, i); -// SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data); -// avgTransferInfo(pInputInfo, pInfo); -// } -// -// SET_VAL(GET_RES_INFO(pCtx), 1, 1); -// -// return TSDB_CODE_SUCCESS; -//} -// -//int32_t avgInvertFunction(SqlFunctionCtx* pCtx) { -// int32_t numOfElem = 0; -// -// // Only the pre-computing information loaded and actual data does not loaded -// SInputColumnInfoData* pInput = &pCtx->input; -// int32_t type = pInput->pData[0]->info.type; -// -// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// -// // computing based on the true data block -// SColumnInfoData* pCol = pInput->pData[0]; -// -// int32_t start = pInput->startRowIndex; -// int32_t numOfRows = pInput->numOfRows; -// -// switch (type) { -// case TSDB_DATA_TYPE_TINYINT: { -// LIST_AVG_N(pAvgRes->sum.isum, int8_t); -// break; -// } -// case TSDB_DATA_TYPE_SMALLINT: { -// LIST_AVG_N(pAvgRes->sum.isum, int16_t); -// break; -// } -// case TSDB_DATA_TYPE_INT: { -// LIST_AVG_N(pAvgRes->sum.isum, int32_t); -// break; -// } -// case TSDB_DATA_TYPE_BIGINT: { -// LIST_AVG_N(pAvgRes->sum.isum, int64_t); -// break; -// } -// case TSDB_DATA_TYPE_UTINYINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint8_t); -// break; -// } -// case TSDB_DATA_TYPE_USMALLINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint16_t); -// break; -// } -// case TSDB_DATA_TYPE_UINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint32_t); -// break; -// } -// case TSDB_DATA_TYPE_UBIGINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint64_t); -// break; -// } -// case TSDB_DATA_TYPE_FLOAT: { -// LIST_AVG_N(pAvgRes->sum.dsum, float); -// break; -// } -// case TSDB_DATA_TYPE_DOUBLE: { -// LIST_AVG_N(pAvgRes->sum.dsum, double); -// break; -// } -// default: -// break; -// } -// -// // data in the check operation are all null, not output -// SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); -// return TSDB_CODE_SUCCESS; -//} -// -//int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { -// SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); -// SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); -// -// SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); -// SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); -// int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type; -// -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// pDBuf->sum.isum += pSBuf->sum.isum; -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// pDBuf->sum.usum += pSBuf->sum.usum; -// } else { -// pDBuf->sum.dsum += pSBuf->sum.dsum; -// } -// pDBuf->count += pSBuf->count; -// -// return TSDB_CODE_SUCCESS; -//} -// -//int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { -// SInputColumnInfoData* pInput = &pCtx->input; -// -// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// int32_t type = pAvgRes->type; -// -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count); -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count); -// } else { -// pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count); -// } -// -// // check for overflow -// if (isinf(pAvgRes->result) || isnan(pAvgRes->result)) { -// GET_RES_INFO(pCtx)->numOfRes = 0; -// } -// -// return functionFinalize(pCtx, pBlock); -//} -// -//int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { -// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); -// SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// int32_t resultBytes = getAvgInfoSize(); -// char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); -// -// memcpy(varDataVal(res), pInfo, resultBytes); -// varDataSetLen(res, resultBytes); -// -// int32_t slotId = pCtx->pExpr->base.resSchema.slotId; -// SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); -// -// colDataAppend(pCol, pBlock->info.rows, res, false); -// -// taosMemoryFree(res); -// return pResInfo->numOfRes; -//} - EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { return FUNC_DATA_REQUIRED_SMA_LOAD; } @@ -1121,530 +752,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -//static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, -// const STupleKey* pKey); -//static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); -//static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); - -//static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { -// // the data is loaded, not only the block SMA value -// for (int32_t i = start; i < num + start; ++i) { -// char* p = colDataGetData(pCol, i); -// if (memcmp((void*)tval, p, pCol->info.bytes) == 0) { -// return i; -// } -// } -// -// // if reach here means real data of block SMA is not set in pCtx->input. -// return -1; -//} - -//int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { -// int32_t numOfElems = 0; -// -// SInputColumnInfoData* pInput = &pCtx->input; -// SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; -// -// SColumnInfoData* pCol = pInput->pData[0]; -// int32_t type = pCol->info.type; -// -// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); -// SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo); -// pBuf->type = type; -// -// if (IS_NULL_TYPE(type)) { -// numOfElems = 0; -// goto _min_max_over; -// } -// -// // data in current data block are qualified to the query -// if (pInput->colDataSMAIsSet) { -// numOfElems = pInput->numOfRows - pAgg->numOfNull; -// ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0); -// if (numOfElems == 0) { -// return numOfElems; -// } -// -// void* tval = NULL; -// int16_t index = 0; -// -// if (isMinFunc) { -// tval = &pInput->pColumnDataAgg[0]->min; -// } else { -// tval = &pInput->pColumnDataAgg[0]->max; -// } -// -// if (!pBuf->assign) { -// pBuf->v = *(int64_t*)tval; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } else { -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// int64_t prev = 0; -// GET_TYPED_DATA(prev, int64_t, type, &pBuf->v); -// -// int64_t val = GET_INT64_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(int64_t*)&pBuf->v = val; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// uint64_t prev = 0; -// GET_TYPED_DATA(prev, uint64_t, type, &pBuf->v); -// -// uint64_t val = GET_UINT64_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(uint64_t*)&pBuf->v = val; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } else if (type == TSDB_DATA_TYPE_DOUBLE) { -// double prev = 0; -// GET_TYPED_DATA(prev, double, type, &pBuf->v); -// -// double val = GET_DOUBLE_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(double*)&pBuf->v = val; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } else if (type == TSDB_DATA_TYPE_FLOAT) { -// float prev = 0; -// GET_TYPED_DATA(prev, float, type, &pBuf->v); -// -// float val = GET_DOUBLE_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(float*)&pBuf->v = val; -// } -// -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } -// -// pBuf->assign = true; -// return numOfElems; -// } -// -// int32_t start = pInput->startRowIndex; -// int32_t numOfRows = pInput->numOfRows; -// -// if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { -// if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) { -// int8_t* pData = (int8_t*)pCol->pData; -// int8_t* val = (int8_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_SMALLINT) { -// int16_t* pData = (int16_t*)pCol->pData; -// int16_t* val = (int16_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_INT) { -// int32_t* pData = (int32_t*)pCol->pData; -// int32_t* val = (int32_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_BIGINT) { -// int64_t* pData = (int64_t*)pCol->pData; -// int64_t* val = (int64_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// if (type == TSDB_DATA_TYPE_UTINYINT) { -// uint8_t* pData = (uint8_t*)pCol->pData; -// uint8_t* val = (uint8_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_USMALLINT) { -// uint16_t* pData = (uint16_t*)pCol->pData; -// uint16_t* val = (uint16_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_UINT) { -// uint32_t* pData = (uint32_t*)pCol->pData; -// uint32_t* val = (uint32_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_UBIGINT) { -// uint64_t* pData = (uint64_t*)pCol->pData; -// uint64_t* val = (uint64_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } -// } else if (type == TSDB_DATA_TYPE_DOUBLE) { -// double* pData = (double*)pCol->pData; -// double* val = (double*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_FLOAT) { -// float* pData = (float*)pCol->pData; -// float* val = (float*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -//#if 0 -// if ((*val) == pData[i]) { -// continue; -// } -// -// if ((*val < pData[i]) ^ isMinFunc) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -//#endif -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } -// -//_min_max_over: -// if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { -// pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); -// pBuf->nullTupleSaved = true; -// } -// return numOfElems; -//} - int32_t minFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1);