From b0ee829db25f0ee6921c0359ddb4b25e74fbb16a Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Nov 2022 00:27:49 +0800 Subject: [PATCH 1/3] refactor: do some internal refactor. --- source/libs/executor/inc/executil.h | 2 +- source/libs/executor/inc/executorimpl.h | 159 +--- source/libs/executor/src/cachescanoperator.c | 2 +- source/libs/executor/src/exchangeoperator.c | 2 +- source/libs/executor/src/executil.c | 2 +- source/libs/executor/src/executorimpl.c | 19 +- source/libs/executor/src/groupoperator.c | 10 +- source/libs/executor/src/joinoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 4 +- source/libs/executor/src/scanoperator.c | 15 +- source/libs/executor/src/sortoperator.c | 8 +- source/libs/executor/src/sysscanoperator.c | 4 +- source/libs/executor/src/tfill.c | 6 +- source/libs/executor/src/timewindowoperator.c | 618 +----------- source/libs/function/src/builtinsimpl.c | 893 ------------------ 15 files changed, 141 insertions(+), 1605 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index fd8a357a8b..d5366f1b7a 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -140,7 +140,7 @@ bool hasRemainResults(SGroupResInfo* pGroupResInfo); int32_t getNumOfTotalRes(SGroupResInfo* pGroupResInfo); -SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode); +SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode); EDealRes doTranslateTagExpr(SNode** pNode, void* pContext); int32_t getGroupIdFromTagsVal(void* pMeta, uint64_t uid, SNodeList* pGroupNode, char* keyBuf, uint64_t* pGroupId); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index b9d622a4c3..bd4472327c 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -537,15 +537,6 @@ typedef struct SStreamIntervalOperatorInfo { SWinKey delKey; } SStreamIntervalOperatorInfo; -typedef struct SAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - STableQueryInfo* current; - uint64_t groupId; - SGroupResInfo groupResInfo; - SExprSupp scalarExprSup; -} SAggOperatorInfo; - typedef struct SFillOperatorInfo { struct SFillInfo* pFillInfo; SSDataBlock* pRes; @@ -577,18 +568,6 @@ typedef struct SWindowRowsSup { uint64_t groupId; } SWindowRowsSup; -typedef struct SSessionAggOperatorInfo { - SOptrBasicInfo binfo; - SAggSupporter aggSup; - - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - bool reptScan; // next round scan - int64_t gap; // session window gap - int32_t tsSlotId; // primary timestamp slot id - STimeWindowAggSupp twAggSup; -} SSessionAggOperatorInfo; - typedef struct SResultWindowInfo { void* pOutputBuf; SSessionKey sessionWin; @@ -681,37 +660,6 @@ typedef struct SStreamFillOperatorInfo { SStreamFillInfo* pFillInfo; } SStreamFillOperatorInfo; -typedef struct STimeSliceOperatorInfo { - SSDataBlock* pRes; - STimeWindow win; - SInterval interval; - int64_t current; - SArray* pPrevRow; // SArray - SArray* pNextRow; // SArray - SArray* pLinearInfo; // SArray - bool isPrevRowSet; - bool isNextRowSet; - int32_t fillType; // fill type - SColumn tsCol; // primary timestamp column - SExprSupp scalarSup; // scalar calculation - struct SFillColInfo* pFillColInfo; // fill column info -} STimeSliceOperatorInfo; - -typedef struct SStateWindowOperatorInfo { - // SOptrBasicInfo should be first, SAggSupporter should be second for stream encode - SOptrBasicInfo binfo; - SAggSupporter aggSup; - SExprSupp scalarSup; - - SGroupResInfo groupResInfo; - SWindowRowsSup winSup; - SColumn stateCol; // start row index - bool hasKey; - SStateKeys stateKey; - int32_t tsSlotId; // primary timestamp column slot id - STimeWindowAggSupp twAggSup; -} SStateWindowOperatorInfo; - #define OPTR_IS_OPENED(_optr) (((_optr)->status & OP_OPENED) == OP_OPENED) #define OPTR_SET_OPENED(_optr) ((_optr)->status |= OP_OPENED) @@ -726,6 +674,7 @@ void cleanupBasicInfo(SOptrBasicInfo* pInfo); int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); + int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey); void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); @@ -735,12 +684,12 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, SDiskbasedBuf* pBuf); -bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); -void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); +bool hasLimitOffsetInfo(SLimitInfo* pLimitInfo); +void initLimitInfo(const SNode* pLimit, const SNode* pSLimit, SLimitInfo* pLimitInfo); void applyLimitOffset(SLimitInfo* pLimitInfo, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo, SOperatorInfo* pOperator); -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); +void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput); int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pColList, char** pNextStart); void updateLoadRemoteInfo(SLoadRemoteDataInfo* pInfo, int32_t numOfRows, int32_t dataLen, int64_t startTs, @@ -751,7 +700,7 @@ STimeWindow getFirstQualifiedTimeWindow(int64_t ts, STimeWindow* pWindow, SInter int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag); int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaultBufsz); -void doDestroyExchangeOperatorInfo(void* param); +extern void doDestroyExchangeOperatorInfo(void* param); void setOperatorCompleted(SOperatorInfo* pOperator); void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, @@ -764,79 +713,73 @@ void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); -SSDataBlock* loadNextDataBlock(void* param); - void setResultRowInitCtx(SResultRow* pResult, SqlFunctionCtx* pCtx, int32_t numOfOutput, int32_t* rowEntryInfoOffset); SResultRow* doSetResultOutBufByKey(SDiskbasedBuf* pResultBuf, SResultRowInfo* pResultRowInfo, char* pData, int16_t bytes, bool masterscan, uint64_t groupId, SExecTaskInfo* pTaskInfo, bool isIntervalQuery, SAggSupporter* pSup); - +// operator creater functions +// clang-format off SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode* pExNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, - const char* pUser, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhysiNode* pProjPhyNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* pSortNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, - SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, bool isStream); -SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild); -SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** dowStreams, size_t numStreams, SMergePhysiNode* pMergePhysiNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, bool isStream); + +SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMergeIntervalPhysiNode* pIntervalPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, SMergeAlignedIntervalPhysiNode* pNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); + +SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionWinodwPhysiNode* pSessionNode, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pAggNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDistScanPhysiNode* pBlockScanNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhysiNode* pTableScanNode, SNode* pTagCond, SExecTaskInfo* pTaskInfo); SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStreamPartitionPhysiNode* pPartNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, - SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo, int32_t numOfChild); -SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, - SExecTaskInfo* pTaskInfo); +SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, int32_t numOfChild); + +SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); +// clang-format on int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, int32_t numOfOutput, SArray* pPseudoList); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 873089023a..cdd744bded 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -59,7 +59,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pInfo->readHandle = *readHandle; SDataBlockDescNode* pDescNode = pScanNode->scan.node.pOutputDataBlockDesc; - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); int32_t numOfCols = 0; code = diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index d8c85c5ffb..280880c077 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -303,7 +303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode } tsem_init(&pInfo->ready, 0, 0); - pInfo->pDummyBlock = createResDataBlock(pExNode->node.pOutputDataBlockDesc); + pInfo->pDummyBlock = createDataBlockFromDescNode(pExNode->node.pOutputDataBlockDesc); pInfo->pResultBlockList = taosArrayInit(64, POINTER_BYTES); pInfo->pRecycledBlocks = taosArrayInit(64, POINTER_BYTES); diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 65937ae1bc..08e6e4792b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -208,7 +208,7 @@ SArray* createSortInfo(SNodeList* pNodeList) { return pList; } -SSDataBlock* createResDataBlock(SDataBlockDescNode* pNode) { +SSDataBlock* createDataBlockFromDescNode(SDataBlockDescNode* pNode) { int32_t numOfCols = LIST_LENGTH(pNode->pSlots); SSDataBlock* pBlock = createDataBlock(); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 7d8c7da78d..5eaa8ba8dd 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -76,6 +76,15 @@ static UNUSED_FUNC void* u_realloc(void* p, size_t __size) { #define CLEAR_QUERY_STATUS(q, st) ((q)->status &= (~(st))) #define QUERY_IS_INTERVAL_QUERY(_q) ((_q)->interval.interval > 0) +typedef struct SAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + STableQueryInfo* current; + uint64_t groupId; + SGroupResInfo groupResInfo; + SExprSupp scalarExprSup; +} SAggOperatorInfo; + int32_t getMaximumIdleDurationSec() { return tsShellActivityTimer * 2; } static void setBlockSMAInfo(SqlFunctionCtx* pCtx, SExprInfo* pExpr, SSDataBlock* pBlock); @@ -316,8 +325,8 @@ static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus pCtx->input.startRowIndex = pStatus->startOffset; } -void doApplyFunctions(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, int32_t offset, - int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { +void applyAggFunctionOnPartialTuples(SExecTaskInfo* taskInfo, SqlFunctionCtx* pCtx, SColumnInfoData* pTimeWindowData, + int32_t offset, int32_t forwardStep, int32_t numOfTotal, int32_t numOfOutput) { for (int32_t k = 0; k < numOfOutput; ++k) { // keep it temporarily SFunctionCtxStatus status = {0}; @@ -2039,7 +2048,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; @@ -2213,7 +2222,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* goto _error; } - pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pInfo->numOfExpr); pOperator->exprSupp.pExprInfo = pExprInfo; @@ -2512,7 +2521,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index 8fd37c3b14..bbf9bd2a27 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -314,7 +314,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = j - num; - doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); // assign the group keys or user input constant values if required doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); @@ -331,7 +331,7 @@ static void doHashGroupbyAgg(SOperatorInfo* pOperator, SSDataBlock* pBlock) { } int32_t rowIndex = pBlock->info.rows - num; - doApplyFunctions(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); + applyAggFunctionOnPartialTuples(pTaskInfo, pCtx, NULL, rowIndex, num, pBlock->info.rows, pOperator->exprSupp.numOfExprs); doAssignGroupKeys(pCtx, pOperator->exprSupp.numOfExprs, pBlock->info.rows, rowIndex); } } @@ -431,7 +431,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pAggNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pAggNode->node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); int32_t numOfScalarExpr = 0; @@ -823,7 +823,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition uint32_t defaultPgsz = 0; uint32_t defaultBufsz = 0; - pInfo->binfo.pRes = createResDataBlock(pPartNode->node.pOutputDataBlockDesc); + pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->node.pOutputDataBlockDesc); getBufferPgSize(pInfo->binfo.pRes->info.rowSize, &defaultPgsz, &defaultBufsz); if (!osTempSpaceAvailable()) { @@ -1119,7 +1119,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr } pInfo->partitionSup.needCalc = true; - pInfo->binfo.pRes = createResDataBlock(pPartNode->part.node.pOutputDataBlockDesc); + pInfo->binfo.pRes = createDataBlockFromDescNode(pPartNode->part.node.pOutputDataBlockDesc); if (pInfo->binfo.pRes == NULL) { goto _error; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index a1b44307d4..3839af9913 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -87,7 +87,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t } int32_t numOfCols = 0; - SSDataBlock* pResBlock = createResDataBlock(pJoinNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pJoinNode->node.pOutputDataBlockDesc); SExprInfo* pExprInfo = createExprInfo(pJoinNode->pTargets, NULL, &numOfCols); initResultSizeInfo(&pOperator->resultInfo, 4096); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index ada7964c67..819997c521 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -85,7 +85,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pProjPhyNode->pProjections, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pProjPhyNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pProjPhyNode->node.pOutputDataBlockDesc); initLimitInfo(pProjPhyNode->node.pLimit, pProjPhyNode->node.pSlimit, &pInfo->limitInfo); pInfo->binfo.pRes = pResBlock; @@ -385,7 +385,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy } } - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->node.pOutputDataBlockDesc); int32_t numOfRows = 4096; size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 7988c555e9..c0bea731bd 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -885,7 +885,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, pInfo->base.dataBlockLoadFlag = pTableScanNode->dataRequired; initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); code = filterInitFromNode((SNode*)pTableScanNode->scan.node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); @@ -2352,7 +2352,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys goto _error; } - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); pInfo->scanMode = STREAM_SCAN_FROM_READERHANDLE; pInfo->windowSup = (SWindowSupporter){.pStreamAggSup = NULL, .gap = -1, .parentType = QUERY_NODE_PHYSICAL_PLAN}; @@ -2476,8 +2476,7 @@ static void destroyTagScanOperatorInfo(void* param) { taosMemoryFreeClear(param); } -SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, - STableListInfo* pTableListInfo, SExecTaskInfo* pTaskInfo) { +SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { STagScanInfo* pInfo = taosMemoryCalloc(1, sizeof(STagScanInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); if (pInfo == NULL || pOperator == NULL) { @@ -2499,7 +2498,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi goto _error; } - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->readHandle = *pReadHandle; pInfo->curPos = 0; @@ -2613,7 +2612,7 @@ SArray* generateSortByTsInfo(SArray* colMatchInfo, int32_t order) { return pList; } -int32_t dumpSQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { +int32_t dumpQueryTableCond(const SQueryTableDataCond* src, SQueryTableDataCond* dst) { memcpy((void*)dst, (void*)src, sizeof(SQueryTableDataCond)); dst->colList = taosMemoryCalloc(src->numOfCols, sizeof(SColumnInfo)); for (int i = 0; i < src->numOfCols; i++) { @@ -2664,7 +2663,7 @@ int32_t startGroupTableMergeScan(SOperatorInfo* pOperator) { taosArrayPush(pInfo->sortSourceParams, ¶m); SQueryTableDataCond cond; - dumpSQueryTableCond(&pInfo->base.cond, &cond); + dumpQueryTableCond(&pInfo->base.cond, &cond); taosArrayPush(pInfo->queryConds, &cond); } @@ -2900,7 +2899,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN } initResultSizeInfo(&pOperator->resultInfo, 1024); - pInfo->pResBlock = createResDataBlock(pDescNode); + pInfo->pResBlock = createDataBlockFromDescNode(pDescNode); blockDataEnsureCapacity(pInfo->pResBlock, pOperator->resultInfo.capacity); pInfo->sortSourceParams = taosArrayInit(64, sizeof(STableMergeScanSortSourceParam)); diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 02cd0fe696..f2c8dc5083 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -47,7 +47,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* SDataBlockDescNode* pDescNode = pSortNode->node.pOutputDataBlockDesc; int32_t numOfCols = 0; - SSDataBlock* pResBlock = createResDataBlock(pDescNode); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pDescNode); SExprInfo* pExprInfo = createExprInfo(pSortNode->pExprs, NULL, &numOfCols); int32_t numOfOutputCols = 0; @@ -509,7 +509,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort initResultSizeInfo(&pOperator->resultInfo, 1024); pOperator->exprSupp.pCtx = createSqlFunctionCtx(pExprInfo, numOfCols, &pOperator->exprSupp.rowEntryInfoOffset); - pInfo->binfo.pRes = createResDataBlock(pDescNode); + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); int32_t numOfOutputCols = 0; @@ -766,7 +766,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size } initLimitInfo(pMergePhyNode->node.pLimit, pMergePhyNode->node.pSlimit, &pInfo->limitInfo); - pInfo->binfo.pRes = createResDataBlock(pDescNode); + pInfo->binfo.pRes = createDataBlockFromDescNode(pDescNode); int32_t rowSize = pInfo->binfo.pRes->info.rowSize; ASSERT(rowSize < 100 * 1024 * 1024); @@ -779,7 +779,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size } SPhysiNode* pChildNode = (SPhysiNode*)nodesListGetNode(pPhyNode->pChildren, 0); - SSDataBlock* pInputBlock = createResDataBlock(pChildNode->pOutputDataBlockDesc); + SSDataBlock* pInputBlock = createDataBlockFromDescNode(pChildNode->pOutputDataBlockDesc); initResultSizeInfo(&pOperator->resultInfo, 4096); blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index eea2549a42..7ef2668804 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1411,7 +1411,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo->pUser = taosMemoryStrDup((void*)pUser); pInfo->sysInfo = pScanPhyNode->sysInfo; pInfo->showRewrite = pScanPhyNode->showRewrite; - pInfo->pRes = createResDataBlock(pDescNode); + pInfo->pRes = createDataBlockFromDescNode(pDescNode); pInfo->pCondition = pScanNode->node.pConditions; code = filterInitFromNode(pScanNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0); @@ -1928,7 +1928,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi pInfo->readHandle = *readHandle; pInfo->uid = pBlockScanNode->suid; - pInfo->pResBlock = createResDataBlock(pBlockScanNode->node.pOutputDataBlockDesc); + pInfo->pResBlock = createDataBlockFromDescNode(pBlockScanNode->node.pOutputDataBlockDesc); blockDataEnsureCapacity(pInfo->pResBlock, 1); int32_t numOfCols = 0; diff --git a/source/libs/executor/src/tfill.c b/source/libs/executor/src/tfill.c index 7674b9e479..ba826a23d2 100644 --- a/source/libs/executor/src/tfill.c +++ b/source/libs/executor/src/tfill.c @@ -1651,9 +1651,9 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi } initResultSizeInfo(&pOperator->resultInfo, 4096); - pInfo->pRes = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); - pInfo->pSrcBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); - pInfo->pPrevSrcBlock = createResDataBlock(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); + pInfo->pPrevSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity); blockDataEnsureCapacity(pInfo->pPrevSrcBlock, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 013b8d39de..80c3c1c454 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -22,13 +22,37 @@ #include "tfill.h" #include "ttime.h" +#define IS_FINAL_OP(op) ((op)->isFinal) + +typedef struct SSessionAggOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + bool reptScan; // next round scan + int64_t gap; // session window gap + int32_t tsSlotId; // primary timestamp slot id + STimeWindowAggSupp twAggSup; +} SSessionAggOperatorInfo; + +typedef struct SStateWindowOperatorInfo { + SOptrBasicInfo binfo; + SAggSupporter aggSup; + SExprSupp scalarSup; + SGroupResInfo groupResInfo; + SWindowRowsSup winSup; + SColumn stateCol; // start row index + bool hasKey; + SStateKeys stateKey; + int32_t tsSlotId; // primary timestamp column slot id + STimeWindowAggSupp twAggSup; +} SStateWindowOperatorInfo; + typedef enum SResultTsInterpType { RESULT_ROW_START_INTERP = 1, RESULT_ROW_END_INTERP = 2, } SResultTsInterpType; -#define IS_FINAL_OP(op) ((op)->isFinal) - typedef struct SPullWindowInfo { STimeWindow window; uint64_t groupId; @@ -640,7 +664,7 @@ static void doInterpUnclosedTimeWindow(SOperatorInfo* pOperatorInfo, int32_t num setNotInterpoWindowKey(pSup->pCtx, numOfExprs, RESULT_ROW_START_INTERP); updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &w, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, 0, pBlock->info.rows, numOfExprs); if (isResultRowInterpolated(pResult, RESULT_ROW_END_INTERP)) { @@ -937,7 +961,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); @@ -972,7 +996,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul } #endif updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, pInfo, pResult); } @@ -1140,7 +1164,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window @@ -1165,7 +1189,7 @@ static void doStateWindowAggImpl(SOperatorInfo* pOperator, SStateWindowOperatorI } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } @@ -1706,7 +1730,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); SExprSupp* pSup = &pOperator->exprSupp; @@ -1845,7 +1869,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator // pInfo->numOfRows data belong to the current session window updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &window, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); // here we start a new session window @@ -1863,7 +1887,7 @@ static void doSessionWindowAggImpl(SOperatorInfo* pOperator, SSessionAggOperator } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &pRowSup->win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, pRowSup->startRowIndex, pRowSup->numOfRows, pBlock->info.rows, numOfOutput); } @@ -1938,552 +1962,6 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { return (pBInfo->pRes->info.rows == 0) ? NULL : pBInfo->pRes; } -static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i); - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pkey->isNull = false; - char* val = colDataGetData(pColInfoData, rowIndex); - if (!IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, pkey->bytes); - } else { - memcpy(pkey->pData, val, varDataLen(val)); - } - } else { - pkey->isNull = true; - } - } - - pSliceInfo->isPrevRowSet = true; -} - -static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i); - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pkey->isNull = false; - char* val = colDataGetData(pColInfoData, rowIndex); - if (!IS_VAR_DATA_TYPE(pkey->type)) { - memcpy(pkey->pData, val, pkey->bytes); - } else { - memcpy(pkey->pData, val, varDataLen(val)); - } - } else { - pkey->isNull = true; - } - } - - pSliceInfo->isNextRowSet = true; -} - -static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i); - - // null value is represented by using key = INT64_MIN for now. - // TODO: optimize to ignore null values for linear interpolation. - if (!pLinearInfo->isStartSet) { - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); - memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); - } - pLinearInfo->isStartSet = true; - } else if (!pLinearInfo->isEndSet) { - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); - memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); - } - pLinearInfo->isEndSet = true; - } else { - pLinearInfo->start.key = pLinearInfo->end.key; - memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes); - - if (!colDataIsNull_s(pColInfoData, rowIndex)) { - pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); - memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); - } else { - pLinearInfo->end.key = INT64_MIN; - } - } - } - -} - -static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, bool beforeTs) { - int32_t rows = pResBlock->info.rows; - blockDataEnsureCapacity(pResBlock, rows + 1); - // todo set the correct primary timestamp column - - // output the result - bool hasInterp = true; - for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { - SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; - - int32_t dstSlot = pExprInfo->base.resSchema.slotId; - SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, rows, (char*)&pSliceInfo->current, false); - continue; - } - - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - switch (pSliceInfo->fillType) { - case TSDB_FILL_NULL: { - colDataAppendNULL(pDst, rows); - break; - } - - case TSDB_FILL_SET_VALUE: { - SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; - - if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { - float v = 0; - GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); - } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { - double v = 0; - GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); - } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { - int64_t v = 0; - GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); - colDataAppend(pDst, rows, (char*)&v, false); - } - break; - } - - case TSDB_FILL_LINEAR: { - SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot); - - SPoint start = pLinearInfo->start; - SPoint end = pLinearInfo->end; - SPoint current = {.key = pSliceInfo->current}; - - // do not interpolate before ts range, only increate pSliceInfo->current - if (beforeTs && !pLinearInfo->isEndSet) { - return true; - } - - if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) { - hasInterp = false; - break; - } - - if (start.key == INT64_MIN || end.key == INT64_MIN) { - colDataAppendNULL(pDst, rows); - break; - } - - current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); - taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); - colDataAppend(pDst, rows, (char*)current.val, false); - - taosMemoryFree(current.val); - break; - } - case TSDB_FILL_PREV: { - if (!pSliceInfo->isPrevRowSet) { - hasInterp = false; - break; - } - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); - if (pkey->isNull == false) { - colDataAppend(pDst, rows, pkey->pData, false); - } else { - colDataAppendNULL(pDst, rows); - } - break; - } - - case TSDB_FILL_NEXT: { - if (!pSliceInfo->isNextRowSet) { - hasInterp = false; - break; - } - - SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot); - if (pkey->isNull == false) { - colDataAppend(pDst, rows, pkey->pData, false); - } else { - colDataAppendNULL(pDst, rows); - } - break; - } - - case TSDB_FILL_NONE: - default: - break; - } - } - - if (hasInterp) { - pResBlock->info.rows += 1; - } - - return hasInterp; -} - -static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, - SSDataBlock* pSrcBlock, int32_t index) { - blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1); - for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { - SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; - - int32_t dstSlot = pExprInfo->base.resSchema.slotId; - SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); - - if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { - colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); - } else { - int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; - SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); - - if (colDataIsNull_s(pSrc, index)) { - colDataAppendNULL(pDst, pResBlock->info.rows); - continue; - } - - char* v = colDataGetData(pSrc, index); - colDataAppend(pDst, pResBlock->info.rows, v, false); - } - } - - pResBlock->info.rows += 1; - return; -} - - -static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - if (pInfo->pPrevRow != NULL) { - return TSDB_CODE_SUCCESS; - } - - pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pPrevRow == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys key = {0}; - key.bytes = pColInfo->info.bytes; - key.type = pColInfo->info.type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pColInfo->info.bytes); - taosArrayPush(pInfo->pPrevRow, &key); - } - - pInfo->isPrevRowSet = false; - - return TSDB_CODE_SUCCESS; -} - -static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - if (pInfo->pNextRow != NULL) { - return TSDB_CODE_SUCCESS; - } - - pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys)); - if (pInfo->pNextRow == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - SGroupKeys key = {0}; - key.bytes = pColInfo->info.bytes; - key.type = pColInfo->info.type; - key.isNull = false; - key.pData = taosMemoryCalloc(1, pColInfo->info.bytes); - taosArrayPush(pInfo->pNextRow, &key); - } - - pInfo->isNextRowSet = false; - - return TSDB_CODE_SUCCESS; -} - -static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - if (pInfo->pLinearInfo != NULL) { - return TSDB_CODE_SUCCESS; - } - - pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo)); - if (pInfo->pLinearInfo == NULL) { - return TSDB_CODE_OUT_OF_MEMORY; - } - - int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); - for (int32_t i = 0; i < numOfCols; ++i) { - SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); - - SFillLinearInfo linearInfo = {0}; - linearInfo.start.key = INT64_MIN; - linearInfo.end.key = INT64_MIN; - linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); - linearInfo.isStartSet = false; - linearInfo.isEndSet = false; - linearInfo.type = pColInfo->info.type; - linearInfo.bytes = pColInfo->info.bytes; - taosArrayPush(pInfo->pLinearInfo, &linearInfo); - } - - return TSDB_CODE_SUCCESS; -} - -static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { - int32_t code; - code = initPrevRowsKeeper(pInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } - - code = initNextRowsKeeper(pInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } - - code = initFillLinearInfo(pInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - return TSDB_CODE_FAILED; - } - - return TSDB_CODE_SUCCESS; -} - -static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { - if (pOperator->status == OP_EXEC_DONE) { - return NULL; - } - - SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - - STimeSliceOperatorInfo* pSliceInfo = pOperator->info; - SSDataBlock* pResBlock = pSliceInfo->pRes; - SExprSupp* pSup = &pOperator->exprSupp; - - int32_t order = TSDB_ORDER_ASC; - SInterval* pInterval = &pSliceInfo->interval; - SOperatorInfo* downstream = pOperator->pDownstream[0]; - - blockDataCleanup(pResBlock); - - while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); - if (pBlock == NULL) { - break; - } - - int32_t code = initKeeperInfo(pSliceInfo, pBlock); - if (code != TSDB_CODE_SUCCESS) { - T_LONG_JMP(pTaskInfo->env, code); - } - - // the pDataBlock are always the same one, no need to call this again - setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); - - SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); - for (int32_t i = 0; i < pBlock->info.rows; ++i) { - int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - - if (ts == pSliceInfo->current) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else if (ts < pSliceInfo->current) { - // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate - doKeepPrevRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - if (i < pBlock->info.rows - 1) { - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i + 1); - int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); - if (nextTs > pSliceInfo->current) { - while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } else { - // ignore current row, and do nothing - } - } else { // it is the last row of current block - doKeepPrevRows(pSliceInfo, pBlock, i); - } - } else { // ts > pSliceInfo->current - // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate - doKeepNextRows(pSliceInfo, pBlock, i); - doKeepLinearInfo(pSliceInfo, pBlock, i); - - while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { - if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { - break; - } else { - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - } - - // add current row if timestamp match - if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { - addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); - doKeepPrevRows(pSliceInfo, pBlock, i); - - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - - if (pSliceInfo->current > pSliceInfo->win.ekey) { - setOperatorCompleted(pOperator); - break; - } - } - } - } - - // check if need to interpolate after last datablock - // except for fill(next), fill(linear) - while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && - pSliceInfo->fillType != TSDB_FILL_LINEAR) { - genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); - pSliceInfo->current = - taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); - } - - // restore the value - setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); - if (pResBlock->info.rows == 0) { - pOperator->status = OP_EXEC_DONE; - } - - return pResBlock->info.rows == 0 ? NULL : pResBlock; -} - -void destroyTimeSliceOperatorInfo(void* param) { - STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; - - pInfo->pRes = blockDataDestroy(pInfo->pRes); - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { - SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); - taosMemoryFree(pKey->pData); - } - taosArrayDestroy(pInfo->pPrevRow); - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) { - SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i); - taosMemoryFree(pKey->pData); - } - taosArrayDestroy(pInfo->pNextRow); - - for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) { - SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i); - taosMemoryFree(pKey->start.val); - taosMemoryFree(pKey->end.val); - } - taosArrayDestroy(pInfo->pLinearInfo); - - taosMemoryFree(pInfo->pFillColInfo); - taosMemoryFreeClear(param); -} - -SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { - STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); - SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); - if (pOperator == NULL || pInfo == NULL) { - goto _error; - } - - SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode; - SExprSupp* pSup = &pOperator->exprSupp; - - int32_t numOfExprs = 0; - SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); - int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - - if (pInterpPhyNode->pExprs != NULL) { - int32_t num = 0; - SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); - code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num); - if (code != TSDB_CODE_SUCCESS) { - goto _error; - } - } - - pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); - pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); - initResultSizeInfo(&pOperator->resultInfo, 4096); - - pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); - pInfo->pLinearInfo = NULL; - pInfo->pRes = createResDataBlock(pPhyNode->pOutputDataBlockDesc); - pInfo->win = pInterpPhyNode->timeRange; - pInfo->interval.interval = pInterpPhyNode->interval; - pInfo->current = pInfo->win.skey; - - if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { - STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; - pScanInfo->base.cond.twindows = pInfo->win; - pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL; - } - - setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, - pTaskInfo); - pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL); - - blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); - - code = appendDownstream(pOperator, &downstream, 1); - return pOperator; - -_error: - taosMemoryFree(pInfo); - taosMemoryFree(pOperator); - pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; - return NULL; -} - SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWinodwPhysiNode* pStateNode, SExecTaskInfo* pTaskInfo) { SStateWindowOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStateWindowOperatorInfo)); @@ -2528,7 +2006,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pStateNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pStateNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); initResultRowInfo(&pInfo->binfo.resultRowInfo); @@ -2588,7 +2066,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pSessionNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -2975,7 +2453,7 @@ static void doStreamIntervalAggImpl(SOperatorInfo* pOperatorInfo, SSDataBlock* p tSimpleHashPut(pInfo->aggSup.pResultRowHashTable, &key, sizeof(SWinKey), NULL, 0); } updateTimeWindowInfo(&pInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &pInfo->twAggSup.timeWindowData, startPos, forwardRows, pSDataBlock->info.rows, numOfOutput); SWinKey key = { .ts = nextWin.skey, @@ -3226,7 +2704,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); @@ -3551,7 +3029,7 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo return TSDB_CODE_QRY_OUT_OF_MEMORY; } updateTimeWindowInfo(pTimeWindowData, &pCurWin->sessionWin.win, false); - doApplyFunctions(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, pTimeWindowData, startIndex, winRows, rows, numOutput); return TSDB_CODE_SUCCESS; } @@ -4066,7 +3544,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh SExprSupp* pSup = &pOperator->exprSupp; SExprInfo* pExprInfo = createExprInfo(pSessionNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4586,7 +4064,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys SExprSupp* pSup = &pOperator->exprSupp; int32_t numOfCols = 0; SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &numOfCols); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); code = initBasicInfoEx(&pInfo->binfo, pSup, pExprInfo, numOfCols, pResBlock); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -4695,7 +4173,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, pBlock->info.rows, pSup->numOfExprs); finalizeResultRows(iaInfo->aggSup.pResultBuf, &pResultRowInfo->cur, pSup, pResultBlock, pTaskInfo); @@ -4715,7 +4193,7 @@ static void doMergeAlignedIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultR } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &currWin, true); - doApplyFunctions(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, + applyAggFunctionOnPartialTuples(pTaskInfo, pSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, currPos - startPos, pBlock->info.rows, pSup->numOfExprs); } @@ -4873,7 +4351,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pNode->window.node.pOutputDataBlockDesc); initBasicInfo(&iaInfo->binfo, pResBlock); initExecTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &iaInfo->win); @@ -5022,7 +4500,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* } updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &win, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); @@ -5054,7 +4532,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo* doWindowBorderInterpolation(iaInfo, pBlock, pResult, &nextWin, startPos, forwardRows, pExprSup); updateTimeWindowInfo(&iaInfo->twAggSup.timeWindowData, &nextWin, true); - doApplyFunctions(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, + applyAggFunctionOnPartialTuples(pTaskInfo, pExprSup->pCtx, &iaInfo->twAggSup.timeWindowData, startPos, forwardRows, pBlock->info.rows, numOfOutput); doCloseWindow(pResultRowInfo, iaInfo, pResult); @@ -5178,7 +4656,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge goto _error; } - SSDataBlock* pResBlock = createResDataBlock(pIntervalPhyNode->window.node.pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pIntervalPhyNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pIntervalInfo->binfo, pResBlock); initExecTimeWindowInfo(&pIntervalInfo->twAggSup.timeWindowData, &pIntervalInfo->win); @@ -5334,7 +4812,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SExprInfo* pExprInfo = createExprInfo(pIntervalPhyNode->window.pFuncs, NULL, &numOfCols); ASSERT(numOfCols > 0); - SSDataBlock* pResBlock = createResDataBlock(pPhyNode->pOutputDataBlockDesc); + SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); SInterval interval = { .interval = pIntervalPhyNode->interval, .sliding = pIntervalPhyNode->sliding, diff --git a/source/libs/function/src/builtinsimpl.c b/source/libs/function/src/builtinsimpl.c index 640baf4f94..e572ce7a40 100644 --- a/source/libs/function/src/builtinsimpl.c +++ b/source/libs/function/src/builtinsimpl.c @@ -729,375 +729,6 @@ bool getSumFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -//int32_t getAvgInfoSize() { return (int32_t)sizeof(SAvgRes); } -// -//bool getAvgFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { -// pEnv->calcMemSize = sizeof(SAvgRes); -// return true; -//} -// -//bool avgFunctionSetup(SqlFunctionCtx* pCtx, SResultRowEntryInfo* pResultInfo) { -// if (!functionSetup(pCtx, pResultInfo)) { -// return false; -// } -// -// SAvgRes* pRes = GET_ROWCELL_INTERBUF(pResultInfo); -// memset(pRes, 0, sizeof(SAvgRes)); -// return true; -//} - -//int32_t avgFunction(SqlFunctionCtx* pCtx) { -// int32_t numOfElem = 0; -// -// SInputColumnInfoData* pInput = &pCtx->input; -// SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; -// int32_t type = pInput->pData[0]->info.type; -// -// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// pAvgRes->type = type; -// -// // computing based on the true data block -// SColumnInfoData* pCol = pInput->pData[0]; -// -// int32_t start = pInput->startRowIndex; -// int32_t numOfRows = pInput->numOfRows; -// -// if (IS_NULL_TYPE(type)) { -// numOfElem = 0; -// goto _avg_over; -// } -// -// if (pInput->colDataSMAIsSet) { -// numOfElem = numOfRows - pAgg->numOfNull; -// ASSERT(numOfElem >= 0); -// -// pAvgRes->count += numOfElem; -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->sum.isum += pAgg->sum; -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->sum.usum += pAgg->sum; -// } else if (IS_FLOAT_TYPE(type)) { -// pAvgRes->sum.dsum += GET_DOUBLE_VAL((const char*)&(pAgg->sum)); -// } -// } else { // computing based on the true data block -// switch (type) { -// case TSDB_DATA_TYPE_TINYINT: { -// int8_t* plist = (int8_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_SMALLINT: { -// int16_t* plist = (int16_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_INT: { -// int32_t* plist = (int32_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_BIGINT: { -// int64_t* plist = (int64_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.isum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_UTINYINT: { -// uint8_t* plist = (uint8_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_USMALLINT: { -// uint16_t* plist = (uint16_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_UINT: { -// uint32_t* plist = (uint32_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// -// break; -// } -// -// case TSDB_DATA_TYPE_UBIGINT: { -// uint64_t* plist = (uint64_t*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.usum += plist[i]; -// } -// break; -// } -// -// case TSDB_DATA_TYPE_FLOAT: { -// float* plist = (float*)pCol->pData; -//// float val = 0; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.dsum += plist[i]; -// } -//// pAvgRes->sum.dsum = val; -// break; -// } -// -// case TSDB_DATA_TYPE_DOUBLE: { -// double* plist = (double*)pCol->pData; -// for (int32_t i = start; i < numOfRows + pInput->startRowIndex; ++i) { -// if (pCol->hasNull && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// numOfElem += 1; -// pAvgRes->count += 1; -// pAvgRes->sum.dsum += plist[i]; -// } -// break; -// } -// -// default: -// break; -// } -// } -// -//_avg_over: -// // data in the check operation are all null, not output -// SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); -// return TSDB_CODE_SUCCESS; -//} - -//static void avgTransferInfo(SAvgRes* pInput, SAvgRes* pOutput) { -// pOutput->type = pInput->type; -// if (IS_SIGNED_NUMERIC_TYPE(pOutput->type)) { -// pOutput->sum.isum += pInput->sum.isum; -// } else if (IS_UNSIGNED_NUMERIC_TYPE(pOutput->type)) { -// pOutput->sum.usum += pInput->sum.usum; -// } else { -// pOutput->sum.dsum += pInput->sum.dsum; -// } -// -// pOutput->count += pInput->count; -// -// return; -//} -// -//int32_t avgFunctionMerge(SqlFunctionCtx* pCtx) { -// SInputColumnInfoData* pInput = &pCtx->input; -// SColumnInfoData* pCol = pInput->pData[0]; -// ASSERT(pCol->info.type == TSDB_DATA_TYPE_BINARY); -// -// SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// -// int32_t start = pInput->startRowIndex; -// -// for (int32_t i = start; i < start + pInput->numOfRows; ++i) { -// char* data = colDataGetData(pCol, i); -// SAvgRes* pInputInfo = (SAvgRes*)varDataVal(data); -// avgTransferInfo(pInputInfo, pInfo); -// } -// -// SET_VAL(GET_RES_INFO(pCtx), 1, 1); -// -// return TSDB_CODE_SUCCESS; -//} -// -//int32_t avgInvertFunction(SqlFunctionCtx* pCtx) { -// int32_t numOfElem = 0; -// -// // Only the pre-computing information loaded and actual data does not loaded -// SInputColumnInfoData* pInput = &pCtx->input; -// int32_t type = pInput->pData[0]->info.type; -// -// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// -// // computing based on the true data block -// SColumnInfoData* pCol = pInput->pData[0]; -// -// int32_t start = pInput->startRowIndex; -// int32_t numOfRows = pInput->numOfRows; -// -// switch (type) { -// case TSDB_DATA_TYPE_TINYINT: { -// LIST_AVG_N(pAvgRes->sum.isum, int8_t); -// break; -// } -// case TSDB_DATA_TYPE_SMALLINT: { -// LIST_AVG_N(pAvgRes->sum.isum, int16_t); -// break; -// } -// case TSDB_DATA_TYPE_INT: { -// LIST_AVG_N(pAvgRes->sum.isum, int32_t); -// break; -// } -// case TSDB_DATA_TYPE_BIGINT: { -// LIST_AVG_N(pAvgRes->sum.isum, int64_t); -// break; -// } -// case TSDB_DATA_TYPE_UTINYINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint8_t); -// break; -// } -// case TSDB_DATA_TYPE_USMALLINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint16_t); -// break; -// } -// case TSDB_DATA_TYPE_UINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint32_t); -// break; -// } -// case TSDB_DATA_TYPE_UBIGINT: { -// LIST_AVG_N(pAvgRes->sum.usum, uint64_t); -// break; -// } -// case TSDB_DATA_TYPE_FLOAT: { -// LIST_AVG_N(pAvgRes->sum.dsum, float); -// break; -// } -// case TSDB_DATA_TYPE_DOUBLE: { -// LIST_AVG_N(pAvgRes->sum.dsum, double); -// break; -// } -// default: -// break; -// } -// -// // data in the check operation are all null, not output -// SET_VAL(GET_RES_INFO(pCtx), numOfElem, 1); -// return TSDB_CODE_SUCCESS; -//} -// -//int32_t avgCombine(SqlFunctionCtx* pDestCtx, SqlFunctionCtx* pSourceCtx) { -// SResultRowEntryInfo* pDResInfo = GET_RES_INFO(pDestCtx); -// SAvgRes* pDBuf = GET_ROWCELL_INTERBUF(pDResInfo); -// -// SResultRowEntryInfo* pSResInfo = GET_RES_INFO(pSourceCtx); -// SAvgRes* pSBuf = GET_ROWCELL_INTERBUF(pSResInfo); -// int16_t type = pDBuf->type == TSDB_DATA_TYPE_NULL ? pSBuf->type : pDBuf->type; -// -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// pDBuf->sum.isum += pSBuf->sum.isum; -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// pDBuf->sum.usum += pSBuf->sum.usum; -// } else { -// pDBuf->sum.dsum += pSBuf->sum.dsum; -// } -// pDBuf->count += pSBuf->count; -// -// return TSDB_CODE_SUCCESS; -//} -// -//int32_t avgFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { -// SInputColumnInfoData* pInput = &pCtx->input; -// -// SAvgRes* pAvgRes = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// int32_t type = pAvgRes->type; -// -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->result = pAvgRes->sum.isum / ((double)pAvgRes->count); -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// pAvgRes->result = pAvgRes->sum.usum / ((double)pAvgRes->count); -// } else { -// pAvgRes->result = pAvgRes->sum.dsum / ((double)pAvgRes->count); -// } -// -// // check for overflow -// if (isinf(pAvgRes->result) || isnan(pAvgRes->result)) { -// GET_RES_INFO(pCtx)->numOfRes = 0; -// } -// -// return functionFinalize(pCtx, pBlock); -//} -// -//int32_t avgPartialFinalize(SqlFunctionCtx* pCtx, SSDataBlock* pBlock) { -// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); -// SAvgRes* pInfo = GET_ROWCELL_INTERBUF(GET_RES_INFO(pCtx)); -// int32_t resultBytes = getAvgInfoSize(); -// char* res = taosMemoryCalloc(resultBytes + VARSTR_HEADER_SIZE, sizeof(char)); -// -// memcpy(varDataVal(res), pInfo, resultBytes); -// varDataSetLen(res, resultBytes); -// -// int32_t slotId = pCtx->pExpr->base.resSchema.slotId; -// SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId); -// -// colDataAppend(pCol, pBlock->info.rows, res, false); -// -// taosMemoryFree(res); -// return pResInfo->numOfRes; -//} - EFuncDataRequired statisDataRequired(SFunctionNode* pFunc, STimeWindow* pTimeWindow) { return FUNC_DATA_REQUIRED_SMA_LOAD; } @@ -1121,530 +752,6 @@ bool getMinmaxFuncEnv(SFunctionNode* UNUSED_PARAM(pFunc), SFuncExecEnv* pEnv) { return true; } -//static STuplePos saveTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, -// const STupleKey* pKey); -//static int32_t updateTupleData(SqlFunctionCtx* pCtx, int32_t rowIndex, const SSDataBlock* pSrcBlock, STuplePos* pPos); -//static const char* loadTupleData(SqlFunctionCtx* pCtx, const STuplePos* pPos); - -//static int32_t findRowIndex(int32_t start, int32_t num, SColumnInfoData* pCol, const char* tval) { -// // the data is loaded, not only the block SMA value -// for (int32_t i = start; i < num + start; ++i) { -// char* p = colDataGetData(pCol, i); -// if (memcmp((void*)tval, p, pCol->info.bytes) == 0) { -// return i; -// } -// } -// -// // if reach here means real data of block SMA is not set in pCtx->input. -// return -1; -//} - -//int32_t doMinMaxHelper(SqlFunctionCtx* pCtx, int32_t isMinFunc) { -// int32_t numOfElems = 0; -// -// SInputColumnInfoData* pInput = &pCtx->input; -// SColumnDataAgg* pAgg = pInput->pColumnDataAgg[0]; -// -// SColumnInfoData* pCol = pInput->pData[0]; -// int32_t type = pCol->info.type; -// -// SResultRowEntryInfo* pResInfo = GET_RES_INFO(pCtx); -// SMinmaxResInfo* pBuf = GET_ROWCELL_INTERBUF(pResInfo); -// pBuf->type = type; -// -// if (IS_NULL_TYPE(type)) { -// numOfElems = 0; -// goto _min_max_over; -// } -// -// // data in current data block are qualified to the query -// if (pInput->colDataSMAIsSet) { -// numOfElems = pInput->numOfRows - pAgg->numOfNull; -// ASSERT(pInput->numOfRows == pInput->totalRows && numOfElems >= 0); -// if (numOfElems == 0) { -// return numOfElems; -// } -// -// void* tval = NULL; -// int16_t index = 0; -// -// if (isMinFunc) { -// tval = &pInput->pColumnDataAgg[0]->min; -// } else { -// tval = &pInput->pColumnDataAgg[0]->max; -// } -// -// if (!pBuf->assign) { -// pBuf->v = *(int64_t*)tval; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } else { -// if (IS_SIGNED_NUMERIC_TYPE(type)) { -// int64_t prev = 0; -// GET_TYPED_DATA(prev, int64_t, type, &pBuf->v); -// -// int64_t val = GET_INT64_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(int64_t*)&pBuf->v = val; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// uint64_t prev = 0; -// GET_TYPED_DATA(prev, uint64_t, type, &pBuf->v); -// -// uint64_t val = GET_UINT64_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(uint64_t*)&pBuf->v = val; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } else if (type == TSDB_DATA_TYPE_DOUBLE) { -// double prev = 0; -// GET_TYPED_DATA(prev, double, type, &pBuf->v); -// -// double val = GET_DOUBLE_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(double*)&pBuf->v = val; -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } else if (type == TSDB_DATA_TYPE_FLOAT) { -// float prev = 0; -// GET_TYPED_DATA(prev, float, type, &pBuf->v); -// -// float val = GET_DOUBLE_VAL(tval); -// if ((prev < val) ^ isMinFunc) { -// *(float*)&pBuf->v = val; -// } -// -// if (pCtx->subsidiaries.num > 0) { -// index = findRowIndex(pInput->startRowIndex, pInput->numOfRows, pCol, tval); -// if (index >= 0) { -// pBuf->tuplePos = saveTupleData(pCtx, index, pCtx->pSrcBlock, NULL); -// } -// } -// } -// } -// -// pBuf->assign = true; -// return numOfElems; -// } -// -// int32_t start = pInput->startRowIndex; -// int32_t numOfRows = pInput->numOfRows; -// -// if (IS_SIGNED_NUMERIC_TYPE(type) || type == TSDB_DATA_TYPE_BOOL) { -// if (type == TSDB_DATA_TYPE_TINYINT || type == TSDB_DATA_TYPE_BOOL) { -// int8_t* pData = (int8_t*)pCol->pData; -// int8_t* val = (int8_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_SMALLINT) { -// int16_t* pData = (int16_t*)pCol->pData; -// int16_t* val = (int16_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_INT) { -// int32_t* pData = (int32_t*)pCol->pData; -// int32_t* val = (int32_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_BIGINT) { -// int64_t* pData = (int64_t*)pCol->pData; -// int64_t* val = (int64_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } -// } else if (IS_UNSIGNED_NUMERIC_TYPE(type)) { -// if (type == TSDB_DATA_TYPE_UTINYINT) { -// uint8_t* pData = (uint8_t*)pCol->pData; -// uint8_t* val = (uint8_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_USMALLINT) { -// uint16_t* pData = (uint16_t*)pCol->pData; -// uint16_t* val = (uint16_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_UINT) { -// uint32_t* pData = (uint32_t*)pCol->pData; -// uint32_t* val = (uint32_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_UBIGINT) { -// uint64_t* pData = (uint64_t*)pCol->pData; -// uint64_t* val = (uint64_t*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } -// } else if (type == TSDB_DATA_TYPE_DOUBLE) { -// double* pData = (double*)pCol->pData; -// double* val = (double*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -// // ignore the equivalent data value -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } else if (type == TSDB_DATA_TYPE_FLOAT) { -// float* pData = (float*)pCol->pData; -// float* val = (float*)&pBuf->v; -// -// for (int32_t i = start; i < start + numOfRows; ++i) { -// if ((pCol->hasNull) && colDataIsNull_f(pCol->nullbitmap, i)) { -// continue; -// } -// -// if (!pBuf->assign) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// pBuf->tuplePos = saveTupleData(pCtx, i, pCtx->pSrcBlock, NULL); -// } -// pBuf->assign = true; -// } else { -//#if 0 -// if ((*val) == pData[i]) { -// continue; -// } -// -// if ((*val < pData[i]) ^ isMinFunc) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -//#endif -// // NOTE: An faster version to avoid one additional comparison with FPU. -// if (isMinFunc) { // min -// if (*val > pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } else { // max -// if (*val < pData[i]) { -// *val = pData[i]; -// if (pCtx->subsidiaries.num > 0) { -// updateTupleData(pCtx, i, pCtx->pSrcBlock, &pBuf->tuplePos); -// } -// } -// } -// } -// -// numOfElems += 1; -// } -// } -// -//_min_max_over: -// if (numOfElems == 0 && pCtx->subsidiaries.num > 0 && !pBuf->nullTupleSaved) { -// pBuf->nullTuplePos = saveTupleData(pCtx, pInput->startRowIndex, pCtx->pSrcBlock, NULL); -// pBuf->nullTupleSaved = true; -// } -// return numOfElems; -//} - int32_t minFunction(SqlFunctionCtx* pCtx) { int32_t numOfElems = doMinMaxHelper(pCtx, 1); SET_VAL(GET_RES_INFO(pCtx), numOfElems, 1); From 87d9836a756d154e78501076174cb206aaf57009 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Nov 2022 00:28:29 +0800 Subject: [PATCH 2/3] refactor: do some internal refactor. --- source/libs/executor/src/timesliceoperator.c | 587 +++++++++++++++++++ 1 file changed, 587 insertions(+) create mode 100644 source/libs/executor/src/timesliceoperator.c diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c new file mode 100644 index 0000000000..d8cef86971 --- /dev/null +++ b/source/libs/executor/src/timesliceoperator.c @@ -0,0 +1,587 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "executorimpl.h" +#include "filter.h" +#include "function.h" +#include "functionMgt.h" +#include "tcommon.h" +#include "tcompare.h" +#include "tdatablock.h" +#include "tfill.h" +#include "ttime.h" + +typedef struct STimeSliceOperatorInfo { + SSDataBlock* pRes; + STimeWindow win; + SInterval interval; + int64_t current; + SArray* pPrevRow; // SArray + SArray* pNextRow; // SArray + SArray* pLinearInfo; // SArray + bool isPrevRowSet; + bool isNextRowSet; + int32_t fillType; // fill type + SColumn tsCol; // primary timestamp column + SExprSupp scalarSup; // scalar calculation + struct SFillColInfo* pFillColInfo; // fill column info +} STimeSliceOperatorInfo; + +static void destroyTimeSliceOperatorInfo(void* param); + +static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i); + if (!colDataIsNull_s(pColInfoData, rowIndex)) { + pkey->isNull = false; + char* val = colDataGetData(pColInfoData, rowIndex); + if (!IS_VAR_DATA_TYPE(pkey->type)) { + memcpy(pkey->pData, val, pkey->bytes); + } else { + memcpy(pkey->pData, val, varDataLen(val)); + } + } else { + pkey->isNull = true; + } + } + + pSliceInfo->isPrevRowSet = true; +} + +static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i); + if (!colDataIsNull_s(pColInfoData, rowIndex)) { + pkey->isNull = false; + char* val = colDataGetData(pColInfoData, rowIndex); + if (!IS_VAR_DATA_TYPE(pkey->type)) { + memcpy(pkey->pData, val, pkey->bytes); + } else { + memcpy(pkey->pData, val, varDataLen(val)); + } + } else { + pkey->isNull = true; + } + } + + pSliceInfo->isNextRowSet = true; +} + +static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) { + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i); + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i); + + // null value is represented by using key = INT64_MIN for now. + // TODO: optimize to ignore null values for linear interpolation. + if (!pLinearInfo->isStartSet) { + if (!colDataIsNull_s(pColInfoData, rowIndex)) { + pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); + memcpy(pLinearInfo->start.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); + } + pLinearInfo->isStartSet = true; + } else if (!pLinearInfo->isEndSet) { + if (!colDataIsNull_s(pColInfoData, rowIndex)) { + pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); + memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); + } + pLinearInfo->isEndSet = true; + } else { + pLinearInfo->start.key = pLinearInfo->end.key; + memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes); + + if (!colDataIsNull_s(pColInfoData, rowIndex)) { + pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex); + memcpy(pLinearInfo->end.val, colDataGetData(pColInfoData, rowIndex), pLinearInfo->bytes); + } else { + pLinearInfo->end.key = INT64_MIN; + } + } + } + +} + +static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, bool beforeTs) { + int32_t rows = pResBlock->info.rows; + blockDataEnsureCapacity(pResBlock, rows + 1); + // todo set the correct primary timestamp column + + // output the result + bool hasInterp = true; + for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; + + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); + + if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { + colDataAppend(pDst, rows, (char*)&pSliceInfo->current, false); + continue; + } + + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + switch (pSliceInfo->fillType) { + case TSDB_FILL_NULL: { + colDataAppendNULL(pDst, rows); + break; + } + + case TSDB_FILL_SET_VALUE: { + SVariant* pVar = &pSliceInfo->pFillColInfo[j].fillVal; + + if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) { + float v = 0; + GET_TYPED_DATA(v, float, pVar->nType, &pVar->i); + colDataAppend(pDst, rows, (char*)&v, false); + } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) { + double v = 0; + GET_TYPED_DATA(v, double, pVar->nType, &pVar->i); + colDataAppend(pDst, rows, (char*)&v, false); + } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) { + int64_t v = 0; + GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i); + colDataAppend(pDst, rows, (char*)&v, false); + } + break; + } + + case TSDB_FILL_LINEAR: { + SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot); + + SPoint start = pLinearInfo->start; + SPoint end = pLinearInfo->end; + SPoint current = {.key = pSliceInfo->current}; + + // do not interpolate before ts range, only increate pSliceInfo->current + if (beforeTs && !pLinearInfo->isEndSet) { + return true; + } + + if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) { + hasInterp = false; + break; + } + + if (start.key == INT64_MIN || end.key == INT64_MIN) { + colDataAppendNULL(pDst, rows); + break; + } + + current.val = taosMemoryCalloc(pLinearInfo->bytes, 1); + taosGetLinearInterpolationVal(¤t, pLinearInfo->type, &start, &end, pLinearInfo->type); + colDataAppend(pDst, rows, (char*)current.val, false); + + taosMemoryFree(current.val); + break; + } + case TSDB_FILL_PREV: { + if (!pSliceInfo->isPrevRowSet) { + hasInterp = false; + break; + } + + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot); + if (pkey->isNull == false) { + colDataAppend(pDst, rows, pkey->pData, false); + } else { + colDataAppendNULL(pDst, rows); + } + break; + } + + case TSDB_FILL_NEXT: { + if (!pSliceInfo->isNextRowSet) { + hasInterp = false; + break; + } + + SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, srcSlot); + if (pkey->isNull == false) { + colDataAppend(pDst, rows, pkey->pData, false); + } else { + colDataAppendNULL(pDst, rows); + } + break; + } + + case TSDB_FILL_NONE: + default: + break; + } + } + + if (hasInterp) { + pResBlock->info.rows += 1; + } + + return hasInterp; +} + +static void addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock, + SSDataBlock* pSrcBlock, int32_t index) { + blockDataEnsureCapacity(pResBlock, pResBlock->info.rows + 1); + for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) { + SExprInfo* pExprInfo = &pExprSup->pExprInfo[j]; + + int32_t dstSlot = pExprInfo->base.resSchema.slotId; + SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot); + + if (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type)) { + colDataAppend(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false); + } else { + int32_t srcSlot = pExprInfo->base.pParam[0].pCol->slotId; + SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot); + + if (colDataIsNull_s(pSrc, index)) { + colDataAppendNULL(pDst, pResBlock->info.rows); + continue; + } + + char* v = colDataGetData(pSrc, index); + colDataAppend(pDst, pResBlock->info.rows, v, false); + } + } + + pResBlock->info.rows += 1; + return; +} + + +static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { + if (pInfo->pPrevRow != NULL) { + return TSDB_CODE_SUCCESS; + } + + pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys)); + if (pInfo->pPrevRow == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + SGroupKeys key = {0}; + key.bytes = pColInfo->info.bytes; + key.type = pColInfo->info.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, pColInfo->info.bytes); + taosArrayPush(pInfo->pPrevRow, &key); + } + + pInfo->isPrevRowSet = false; + + return TSDB_CODE_SUCCESS; +} + +static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { + if (pInfo->pNextRow != NULL) { + return TSDB_CODE_SUCCESS; + } + + pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys)); + if (pInfo->pNextRow == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + SGroupKeys key = {0}; + key.bytes = pColInfo->info.bytes; + key.type = pColInfo->info.type; + key.isNull = false; + key.pData = taosMemoryCalloc(1, pColInfo->info.bytes); + taosArrayPush(pInfo->pNextRow, &key); + } + + pInfo->isNextRowSet = false; + + return TSDB_CODE_SUCCESS; +} + +static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { + if (pInfo->pLinearInfo != NULL) { + return TSDB_CODE_SUCCESS; + } + + pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo)); + if (pInfo->pLinearInfo == NULL) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock); + for (int32_t i = 0; i < numOfCols; ++i) { + SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i); + + SFillLinearInfo linearInfo = {0}; + linearInfo.start.key = INT64_MIN; + linearInfo.end.key = INT64_MIN; + linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes); + linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes); + linearInfo.isStartSet = false; + linearInfo.isEndSet = false; + linearInfo.type = pColInfo->info.type; + linearInfo.bytes = pColInfo->info.bytes; + taosArrayPush(pInfo->pLinearInfo, &linearInfo); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) { + int32_t code; + code = initPrevRowsKeeper(pInfo, pBlock); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + + code = initNextRowsKeeper(pInfo, pBlock); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + + code = initFillLinearInfo(pInfo, pBlock); + if (code != TSDB_CODE_SUCCESS) { + return TSDB_CODE_FAILED; + } + + return TSDB_CODE_SUCCESS; +} + +static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { + if (pOperator->status == OP_EXEC_DONE) { + return NULL; + } + + SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + + STimeSliceOperatorInfo* pSliceInfo = pOperator->info; + SSDataBlock* pResBlock = pSliceInfo->pRes; + SExprSupp* pSup = &pOperator->exprSupp; + + int32_t order = TSDB_ORDER_ASC; + SInterval* pInterval = &pSliceInfo->interval; + SOperatorInfo* downstream = pOperator->pDownstream[0]; + + blockDataCleanup(pResBlock); + + while (1) { + SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + if (pBlock == NULL) { + break; + } + + int32_t code = initKeeperInfo(pSliceInfo, pBlock); + if (code != TSDB_CODE_SUCCESS) { + T_LONG_JMP(pTaskInfo->env, code); + } + + // the pDataBlock are always the same one, no need to call this again + setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true); + + SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId); + for (int32_t i = 0; i < pBlock->info.rows; ++i) { + int64_t ts = *(int64_t*)colDataGetData(pTsCol, i); + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + + if (ts == pSliceInfo->current) { + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); + + doKeepPrevRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } else if (ts < pSliceInfo->current) { + // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate + doKeepPrevRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + if (i < pBlock->info.rows - 1) { + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate + doKeepNextRows(pSliceInfo, pBlock, i + 1); + int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1); + if (nextTs > pSliceInfo->current) { + while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) { + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } else { + // ignore current row, and do nothing + } + } else { // it is the last row of current block + doKeepPrevRows(pSliceInfo, pBlock, i); + } + } else { // ts > pSliceInfo->current + // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate + doKeepNextRows(pSliceInfo, pBlock, i); + doKeepLinearInfo(pSliceInfo, pBlock, i); + + while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) { + if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, true) && pSliceInfo->fillType == TSDB_FILL_LINEAR) { + break; + } else { + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + } + + // add current row if timestamp match + if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) { + addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i); + doKeepPrevRows(pSliceInfo, pBlock, i); + + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + + if (pSliceInfo->current > pSliceInfo->win.ekey) { + setOperatorCompleted(pOperator); + break; + } + } + } + } + + // check if need to interpolate after last datablock + // except for fill(next), fill(linear) + while (pSliceInfo->current <= pSliceInfo->win.ekey && pSliceInfo->fillType != TSDB_FILL_NEXT && + pSliceInfo->fillType != TSDB_FILL_LINEAR) { + genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, false); + pSliceInfo->current = + taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision); + } + + // restore the value + setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED); + if (pResBlock->info.rows == 0) { + pOperator->status = OP_EXEC_DONE; + } + + return pResBlock->info.rows == 0 ? NULL : pResBlock; +} + +SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo) { + STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo)); + SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + if (pOperator == NULL || pInfo == NULL) { + goto _error; + } + + SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode; + SExprSupp* pSup = &pOperator->exprSupp; + + int32_t numOfExprs = 0; + SExprInfo* pExprInfo = createExprInfo(pInterpPhyNode->pFuncs, NULL, &numOfExprs); + int32_t code = initExprSupp(pSup, pExprInfo, numOfExprs); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + + if (pInterpPhyNode->pExprs != NULL) { + int32_t num = 0; + SExprInfo* pScalarExprInfo = createExprInfo(pInterpPhyNode->pExprs, NULL, &num); + code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num); + if (code != TSDB_CODE_SUCCESS) { + goto _error; + } + } + + pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries); + pInfo->fillType = convertFillType(pInterpPhyNode->fillMode); + initResultSizeInfo(&pOperator->resultInfo, 4096); + + pInfo->pFillColInfo = createFillColInfo(pExprInfo, numOfExprs, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues); + pInfo->pLinearInfo = NULL; + pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); + pInfo->win = pInterpPhyNode->timeRange; + pInfo->interval.interval = pInterpPhyNode->interval; + pInfo->current = pInfo->win.skey; + + if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { + STableScanInfo* pScanInfo = (STableScanInfo*)downstream->info; + pScanInfo->base.cond.twindows = pInfo->win; + pScanInfo->base.cond.type = TIMEWINDOW_RANGE_EXTERNAL; + } + + setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, + pTaskInfo); + pOperator->fpSet = createOperatorFpSet(operatorDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, NULL); + + blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); + + code = appendDownstream(pOperator, &downstream, 1); + return pOperator; + + _error: + taosMemoryFree(pInfo); + taosMemoryFree(pOperator); + pTaskInfo->code = TSDB_CODE_OUT_OF_MEMORY; + return NULL; +} + +void destroyTimeSliceOperatorInfo(void* param) { + STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param; + + pInfo->pRes = blockDataDestroy(pInfo->pRes); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) { + SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i); + taosMemoryFree(pKey->pData); + } + taosArrayDestroy(pInfo->pPrevRow); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) { + SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i); + taosMemoryFree(pKey->pData); + } + taosArrayDestroy(pInfo->pNextRow); + + for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) { + SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i); + taosMemoryFree(pKey->start.val); + taosMemoryFree(pKey->end.val); + } + taosArrayDestroy(pInfo->pLinearInfo); + + taosMemoryFree(pInfo->pFillColInfo); + taosMemoryFreeClear(param); +} From d8da10f2bb697dddb79bcdf9958521d1e8e5114e Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Mon, 28 Nov 2022 00:51:18 +0800 Subject: [PATCH 3/3] refactor: do some internal refactor. --- source/libs/executor/inc/executil.h | 30 ++--- source/libs/executor/inc/executorimpl.h | 61 ++++------- source/libs/executor/src/executor.c | 17 +-- source/libs/executor/src/executorimpl.c | 103 +----------------- source/libs/executor/src/groupoperator.c | 2 +- source/libs/executor/src/projectoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 35 ++---- 7 files changed, 56 insertions(+), 196 deletions(-) diff --git a/source/libs/executor/inc/executil.h b/source/libs/executor/inc/executil.h index d5366f1b7a..875528576d 100644 --- a/source/libs/executor/inc/executil.h +++ b/source/libs/executor/inc/executil.h @@ -38,16 +38,7 @@ memcpy((_k) + sizeof(uint64_t), (_ori), (_len)); \ } while (0) -#define SET_RES_EXT_WINDOW_KEY(_k, _ori, _len, _uid, _buf) \ - do { \ - assert(sizeof(_uid) == sizeof(uint64_t)); \ - *(void**)(_k) = (_buf); \ - *(uint64_t*)((_k) + POINTER_BYTES) = (_uid); \ - memcpy((_k) + POINTER_BYTES + sizeof(uint64_t), (_ori), (_len)); \ - } while (0) - #define GET_RES_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t)) -#define GET_RES_EXT_WINDOW_KEY_LEN(_l) ((_l) + sizeof(uint64_t) + POINTER_BYTES) #define GET_TASKID(_t) (((SExecTaskInfo*)(_t))->id.str) @@ -104,16 +95,17 @@ int32_t createScanTableListInfo(SScanPhysiNode* pScanNode, SNodeList* pGroupTags STableListInfo* pTableListInfo, SNode* pTagCond, SNode* pTagIndexCond, SExecTaskInfo* pTaskInfo); STableListInfo* tableListCreate(); -void* tableListDestroy(STableListInfo* pTableListInfo); -void tableListClear(STableListInfo* pTableListInfo); -int32_t tableListGetOutputGroups(const STableListInfo* pTableList); -bool oneTableForEachGroup(const STableListInfo* pTableList); -uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); -int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); -int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, int32_t* num); -uint64_t tableListGetSize(const STableListInfo* pTableList); -uint64_t tableListGetSuid(const STableListInfo* pTableList); -STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); +void* tableListDestroy(STableListInfo* pTableListInfo); +void tableListClear(STableListInfo* pTableListInfo); +int32_t tableListGetOutputGroups(const STableListInfo* pTableList); +bool oneTableForEachGroup(const STableListInfo* pTableList); +uint64_t getTableGroupId(const STableListInfo* pTableList, uint64_t tableUid); +int32_t tableListAddTableInfo(STableListInfo* pTableList, uint64_t uid, uint64_t gid); +int32_t tableListGetGroupList(const STableListInfo* pTableList, int32_t ordinalIndex, STableKeyInfo** pKeyInfo, + int32_t* num); +uint64_t tableListGetSize(const STableListInfo* pTableList); +uint64_t tableListGetSuid(const STableListInfo* pTableList); +STableKeyInfo* tableListGetInfo(const STableListInfo* pTableList, int32_t index); size_t getResultRowSize(struct SqlFunctionCtx* pCtx, int32_t numOfOutput); void initResultRowInfo(SResultRowInfo* pResultRowInfo); diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index bd4472327c..8163217039 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -665,18 +665,25 @@ typedef struct SStreamFillOperatorInfo { SOperatorFpSet createOperatorFpSet(__optr_open_fn_t openFn, __optr_fn_t nextFn, __optr_fn_t cleanup, __optr_close_fn_t closeFn, __optr_explain_fn_t explain); - -int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); -int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); +int32_t operatorDummyOpenFn(SOperatorInfo* pOperator); +int32_t appendDownstream(SOperatorInfo* p, SOperatorInfo** pDownstream, int32_t num); +void setOperatorCompleted(SOperatorInfo* pOperator); +void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, + void* pInfo, SExecTaskInfo* pTaskInfo); +void destroyOperatorInfo(SOperatorInfo* pOperator); void initBasicInfo(SOptrBasicInfo* pInfo, SSDataBlock* pBlock); void cleanupBasicInfo(SOptrBasicInfo* pInfo); + int32_t initExprSupp(SExprSupp* pSup, SExprInfo* pExprInfo, int32_t numOfExpr); void cleanupExprSupp(SExprSupp* pSup); + void destroyExprInfo(SExprInfo* pExpr, int32_t numOfExprs); -int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, - const char* pkey); +int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, + const char* pkey); +void cleanupAggSup(SAggSupporter* pAggSup); + void initResultSizeInfo(SResultInfo* pResultInfo, int32_t numOfRows); void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGroupResInfo* pGroupResInfo, @@ -702,14 +709,10 @@ int32_t getBufferPgSize(int32_t rowSize, uint32_t* defaultPgsz, uint32_t* defaul extern void doDestroyExchangeOperatorInfo(void* param); -void setOperatorCompleted(SOperatorInfo* pOperator); -void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, - void* pInfo, SExecTaskInfo* pTaskInfo); void doFilter(SSDataBlock* pBlock, SFilterInfo* pFilterInfo, SColMatchInfo* pColMatchInfo); int32_t addTagPseudoColumnData(SReadHandle* pHandle, const SExprInfo* pExpr, int32_t numOfExpr, SSDataBlock* pBlock, int32_t rows, const char* idStr, STableMetaCacheInfo* pCache); -void cleanupAggSup(SAggSupporter* pAggSup); void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle); void setTbNameColData(const SSDataBlock* pBlock, SColumnInfoData* pColInfoData, int32_t functionId, const char* name); @@ -724,6 +727,8 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* pHandle, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, SExecTaskInfo* pTaskInfo); + SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScanPhysiNode* pScanPhyNode, const char* pUser, SExecTaskInfo* pTaskInfo); @@ -779,6 +784,8 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo); SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode, SExecTaskInfo* pTaskInfo); + +SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, SExecTaskInfo* pTaskInfo); // clang-format on int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBlock* pSrcBlock, SqlFunctionCtx* pCtx, @@ -786,38 +793,22 @@ int32_t projectApplyFunctions(SExprInfo* pExpr, SSDataBlock* pResult, SSDataBloc void setInputDataBlock(SExprSupp* pExprSupp, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); -bool isTaskKilled(SExecTaskInfo* pTaskInfo); int32_t checkForQueryBuf(size_t numOfTables); +bool isTaskKilled(SExecTaskInfo* pTaskInfo); void setTaskKilled(SExecTaskInfo* pTaskInfo); -void queryCostStatis(SExecTaskInfo* pTaskInfo); void doDestroyTask(SExecTaskInfo* pTaskInfo); -void destroyOperatorInfo(SOperatorInfo* pOperator); -int32_t getMaximumIdleDurationSec(); - -/* - * ops: root operator - * data: *data save the result of encode, need to be freed by caller - * length: *length save the length of *data - * nOptrWithVal: *nOptrWithVal save the number of optr with value - * return: result code, 0 means success - */ -int32_t encodeOperator(SOperatorInfo* ops, char** data, int32_t* length, int32_t* nOptrWithVal); - -/* - * ops: root operator, created by caller - * data: save the result of decode - * length: the length of data - * return: result code, 0 means success - */ -int32_t decodeOperator(SOperatorInfo* ops, const char* data, int32_t length); - void setTaskStatus(SExecTaskInfo* pTaskInfo, int8_t status); + int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SReadHandle* pHandle, uint64_t taskId, char* sql, EOPTR_EXEC_MODEL model); int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle); int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInfoList); +void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo); + +int32_t getMaximumIdleDurationSec(); + STimeWindow getActiveTimeWindow(SDiskbasedBuf* pBuf, SResultRowInfo* pResultRowInfo, int64_t ts, SInterval* pInterval, int32_t order); int32_t getNumOfRowsInTimeWindow(SDataBlockInfo* pDataBlockInfo, TSKEY* pPrimaryColumn, int32_t startPos, TSKEY ekey, @@ -840,15 +831,7 @@ void calBlockTbName(SStreamScanInfo* pInfo, SSDataBlock* pBlock); int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPosition, SExprSupp* pSup, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); -SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSortPhysiNode* pSortPhyNode, - SExecTaskInfo* pTaskInfo); -SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanNode, SReadHandle* readHandle, - SExecTaskInfo* pTaskInfo); - -void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex); - bool groupbyTbname(SNodeList* pGroupList); -void* destroySqlFunctionCtx(SqlFunctionCtx* pCtx, int32_t numOfOutput); int32_t buildDataBlockFromGroupRes(SOperatorInfo* pOperator, SStreamState* pState, SSDataBlock* pBlock, SExprSupp* pSup, SGroupResInfo* pGroupResInfo); int32_t saveSessionDiscBuf(SStreamState* pState, SSessionKey* key, void* buf, int32_t size); diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 9b3bd1d808..34bd9cf8ca 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -712,7 +712,7 @@ void qDestroyTask(qTaskInfo_t qTaskHandle) { qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows); - queryCostStatis(pTaskInfo); // print the query cost summary + printTaskExecCostInLog(pTaskInfo); // print the query cost summary doDestroyTask(pTaskInfo); } @@ -728,12 +728,12 @@ int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) { } int32_t nOptrWithVal = 0; - int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); - if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { - taosMemoryFreeClear(*pOutput); - *len = 0; - } - return code; +// int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal); +// if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal == 0)) { +// taosMemoryFreeClear(*pOutput); +// *len = 0; +// } + return 0; } int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) { @@ -743,7 +743,8 @@ int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t le return TSDB_CODE_INVALID_PARA; } - return decodeOperator(pTaskInfo->pRoot, pInput, len); + return 0; +// return decodeOperator(pTaskInfo->pRoot, pInput, len); } int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 5eaa8ba8dd..5abde1be85 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -1335,7 +1335,7 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG } } -void queryCostStatis(SExecTaskInfo* pTaskInfo) { +void printTaskExecCostInLog(SExecTaskInfo* pTaskInfo) { STaskCostInfo* pSummary = &pTaskInfo->cost; SFileBlockLoadRecorder* pRecorder = pSummary->pRecoder; @@ -1958,7 +1958,7 @@ void cleanupAggSup(SAggSupporter* pAggSup) { destroyDiskbasedBuf(pAggSup->pResultBuf); } -int32_t initAggInfo(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, +int32_t initAggSup(SExprSupp* pSup, SAggSupporter* pAggSup, SExprInfo* pExprInfo, int32_t numOfCols, size_t keyBufSize, const char* pkey) { int32_t code = initExprSupp(pSup, pExprInfo, numOfCols); if (code != TSDB_CODE_SUCCESS) { @@ -2056,7 +2056,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2734,103 +2734,6 @@ int32_t rebuildReader(SOperatorInfo* pOperator, SSubplan* plan, SReadHandle* pHa } #endif -int32_t encodeOperator(SOperatorInfo* ops, char** result, int32_t* length, int32_t* nOptrWithVal) { - int32_t code = TDB_CODE_SUCCESS; - char* pCurrent = NULL; - int32_t currLength = 0; - if (ops->fpSet.encodeResultRow) { - if (result == NULL || length == NULL || nOptrWithVal == NULL) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - code = ops->fpSet.encodeResultRow(ops, &pCurrent, &currLength); - - if (code != TDB_CODE_SUCCESS) { - if (*result != NULL) { - taosMemoryFree(*result); - *result = NULL; - } - return code; - } else if (currLength == 0) { - ASSERT(!pCurrent); - goto _downstream; - } - - ++(*nOptrWithVal); - - ASSERT(currLength >= 0); - - if (*result == NULL) { - *result = (char*)taosMemoryCalloc(1, currLength + sizeof(int32_t)); - if (*result == NULL) { - taosMemoryFree(pCurrent); - return TSDB_CODE_OUT_OF_MEMORY; - } - memcpy(*result + sizeof(int32_t), pCurrent, currLength); - *(int32_t*)(*result) = currLength + sizeof(int32_t); - } else { - int32_t sizePre = *(int32_t*)(*result); - char* tmp = (char*)taosMemoryRealloc(*result, sizePre + currLength); - if (tmp == NULL) { - taosMemoryFree(pCurrent); - taosMemoryFree(*result); - *result = NULL; - return TSDB_CODE_OUT_OF_MEMORY; - } - *result = tmp; - memcpy(*result + sizePre, pCurrent, currLength); - *(int32_t*)(*result) += currLength; - } - taosMemoryFree(pCurrent); - *length = *(int32_t*)(*result); - } - -_downstream: - for (int32_t i = 0; i < ops->numOfDownstream; ++i) { - code = encodeOperator(ops->pDownstream[i], result, length, nOptrWithVal); - if (code != TDB_CODE_SUCCESS) { - return code; - } - } - return TDB_CODE_SUCCESS; -} - -int32_t decodeOperator(SOperatorInfo* ops, const char* result, int32_t length) { - int32_t code = TDB_CODE_SUCCESS; - if (ops->fpSet.decodeResultRow) { - if (result == NULL) { - return TSDB_CODE_TSC_INVALID_INPUT; - } - - ASSERT(length == *(int32_t*)result); - - const char* data = result + sizeof(int32_t); - code = ops->fpSet.decodeResultRow(ops, (char*)data); - if (code != TDB_CODE_SUCCESS) { - return code; - } - - int32_t totalLength = *(int32_t*)result; - int32_t dataLength = *(int32_t*)data; - - if (totalLength == dataLength + sizeof(int32_t)) { // the last data - result = NULL; - length = 0; - } else { - result += dataLength; - *(int32_t*)(result) = totalLength - dataLength; - length = totalLength - dataLength; - } - } - - for (int32_t i = 0; i < ops->numOfDownstream; ++i) { - code = decodeOperator(ops->pDownstream[i], result, length); - if (code != TDB_CODE_SUCCESS) { - return code; - } - } - return TDB_CODE_SUCCESS; -} - int32_t createDataSinkParam(SDataSinkNode* pNode, void** pParam, qTaskInfo_t* pTaskInfo, SReadHandle* readHandle) { SExecTaskInfo* pTask = *(SExecTaskInfo**)pTaskInfo; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index bbf9bd2a27..6dc8818900 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -456,7 +456,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pAggNode->pAggFuncs, pAggNode->pGroupKeys, &num); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, pInfo->groupKeyLen, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 819997c521..4bba3a72e1 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -102,7 +102,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys } initResultSizeInfo(&pOperator->resultInfo, numOfRows); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -400,7 +400,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy initResultSizeInfo(&pOperator->resultInfo, numOfRows); blockDataEnsureCapacity(pResBlock, numOfRows); - int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfExpr, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 80c3c1c454..0e0ec5b339 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1741,7 +1741,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pPhyNode->window.pFuncs, NULL, &num); - int32_t code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2001,7 +2001,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi SExprInfo* pExprInfo = createExprInfo(pStateNode->window.pFuncs, NULL, &num); initResultSizeInfo(&pOperator->resultInfo, 4096); - code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2069,7 +2069,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW SSDataBlock* pResBlock = createDataBlockFromDescNode(pSessionNode->window.node.pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -2243,26 +2243,6 @@ static void clearSpecialDataBlock(SSDataBlock* pBlock) { blockDataCleanup(pBlock); } -void copyUpdateDataBlock(SSDataBlock* pDest, SSDataBlock* pSource, int32_t tsColIndex) { - // ASSERT(pDest->info.capacity >= pSource->info.rows); - blockDataEnsureCapacity(pDest, pSource->info.rows); - clearSpecialDataBlock(pDest); - SColumnInfoData* pDestCol = taosArrayGet(pDest->pDataBlock, 0); - SColumnInfoData* pSourceCol = taosArrayGet(pSource->pDataBlock, tsColIndex); - - // copy timestamp column - colDataAssign(pDestCol, pSourceCol, pSource->info.rows, &pDest->info); - for (int32_t i = 1; i < taosArrayGetSize(pDest->pDataBlock); i++) { - SColumnInfoData* pCol = taosArrayGet(pDest->pDataBlock, i); - colDataAppendNNULL(pCol, 0, pSource->info.rows); - } - - pDest->info.rows = pSource->info.rows; - pDest->info.groupId = pSource->info.groupId; - pDest->info.type = pSource->info.type; - blockDataUpdateTsWindow(pDest, 0); -} - static void doBuildPullDataBlock(SArray* array, int32_t* pIndex, SSDataBlock* pBlock) { clearSpecialDataBlock(pBlock); int32_t size = taosArrayGetSize(array); @@ -2707,7 +2687,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, SSDataBlock* pResBlock = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc); initBasicInfo(&pInfo->binfo, pResBlock); - int32_t code = initAggInfo(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(&pOperator->exprSupp, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4346,7 +4326,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, int32_t num = 0; SExprInfo* pExprInfo = createExprInfo(pNode->window.pFuncs, NULL, &num); - code = initAggInfo(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + code = initAggSup(&pOperator->exprSupp, &iaInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4651,7 +4631,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; initResultSizeInfo(&pOperator->resultInfo, 4096); - int32_t code = initAggInfo(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); + int32_t code = initAggSup(pExprSupp, &pIntervalInfo->aggSup, pExprInfo, num, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4847,7 +4827,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys initResultSizeInfo(&pOperator->resultInfo, 4096); size_t keyBufSize = sizeof(int64_t) + sizeof(int64_t) + POINTER_BYTES; - code = initAggInfo(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); + code = initAggSup(pSup, &pInfo->aggSup, pExprInfo, numOfCols, keyBufSize, pTaskInfo->id.str); if (code != TSDB_CODE_SUCCESS) { goto _error; } @@ -4901,3 +4881,4 @@ _error: pTaskInfo->code = code; return NULL; } +