From 7b9d73c77a482c628d18f12c10f13f9a5fa970e2 Mon Sep 17 00:00:00 2001 From: dapan1121 Date: Wed, 5 Jul 2023 11:07:08 +0800 Subject: [PATCH] feat: add dynamic query ctrl operator --- include/common/tmsg.h | 1 + include/libs/nodes/plannodes.h | 6 +- source/libs/executor/inc/dynqueryctrl.h | 24 +- source/libs/executor/inc/executorInt.h | 25 +- source/libs/executor/inc/groupcache.h | 2 +- source/libs/executor/inc/hashjoin.h | 1 + source/libs/executor/inc/operator.h | 21 +- source/libs/executor/src/aggregateoperator.c | 4 +- source/libs/executor/src/cachescanoperator.c | 2 +- .../libs/executor/src/dynqueryctrloperator.c | 221 +++++++++++++++++- .../libs/executor/src/eventwindowoperator.c | 4 +- source/libs/executor/src/exchangeoperator.c | 64 ++++- source/libs/executor/src/filloperator.c | 8 +- source/libs/executor/src/groupcacheoperator.c | 20 +- source/libs/executor/src/groupoperator.c | 12 +- source/libs/executor/src/hashjoinoperator.c | 14 +- source/libs/executor/src/mergejoinoperator.c | 13 +- source/libs/executor/src/operator.c | 60 ++++- source/libs/executor/src/projectoperator.c | 8 +- source/libs/executor/src/scanoperator.c | 14 +- source/libs/executor/src/sortoperator.c | 8 +- source/libs/executor/src/sysscanoperator.c | 4 +- source/libs/executor/src/timesliceoperator.c | 4 +- source/libs/executor/src/timewindowoperator.c | 40 ++-- source/libs/planner/src/planSpliter.c | 1 + 25 files changed, 460 insertions(+), 121 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index fa092a453c..f4c0cae27c 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1848,6 +1848,7 @@ typedef struct { uint64_t queryId; uint64_t taskId; int32_t execId; + void* opParam; } SResFetchReq; int32_t tSerializeSResFetchReq(void* buf, int32_t bufLen, SResFetchReq* pReq); diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 6ef8a92667..74c3120d87 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -444,16 +444,16 @@ typedef struct SGroupCachePhysiNode { SNodeList* pGroupCols; } SGroupCachePhysiNode; -typedef struct SStbJoinDynCtrlInfo { +typedef struct SStbJoinDynCtrlBasic { int32_t vgSlot[2]; int32_t uidSlot[2]; -} SStbJoinDynCtrlInfo; +} SStbJoinDynCtrlBasic; typedef struct SDynQueryCtrlPhysiNode { SPhysiNode node; EDynQueryType qType; union { - SStbJoinDynCtrlInfo stbJoin; + SStbJoinDynCtrlBasic stbJoin; }; } SDynQueryCtrlPhysiNode; diff --git a/source/libs/executor/inc/dynqueryctrl.h b/source/libs/executor/inc/dynqueryctrl.h index d5cd52e004..8816e15a2f 100755 --- a/source/libs/executor/inc/dynqueryctrl.h +++ b/source/libs/executor/inc/dynqueryctrl.h @@ -19,8 +19,30 @@ extern "C" { #endif +typedef struct SStbJoinPrevJoinCtx { + SSDataBlock* pLastBlk; + int32_t lastRow; +} SStbJoinPrevJoinCtx; + +typedef struct SStbJoinPostJoinCtx { + bool isStarted; +} SStbJoinPostJoinCtx; + +typedef struct SStbJoinDynCtrlCtx { + SStbJoinPrevJoinCtx prev; + SStbJoinPostJoinCtx post; +} SStbJoinDynCtrlCtx; + +typedef struct SStbJoinDynCtrlInfo { + SStbJoinDynCtrlBasic basic; + SStbJoinDynCtrlCtx ctx; +} SStbJoinDynCtrlInfo; + typedef struct SDynQueryCtrlOperatorInfo { - SStbJoinDynCtrlInfo ctrlInfo; + EDynQueryType qType; + union { + SStbJoinDynCtrlInfo stbJoin; + }; } SDynQueryCtrlOperatorInfo; #ifdef __cplusplus diff --git a/source/libs/executor/inc/executorInt.h b/source/libs/executor/inc/executorInt.h index 5d663df50e..7e080b8bf2 100644 --- a/source/libs/executor/inc/executorInt.h +++ b/source/libs/executor/inc/executorInt.h @@ -115,8 +115,9 @@ typedef struct SExprSupp { typedef enum { EX_SOURCE_DATA_NOT_READY = 0x1, - EX_SOURCE_DATA_READY = 0x2, - EX_SOURCE_DATA_EXHAUSTED = 0x3, + EX_SOURCE_DATA_STARTED, + EX_SOURCE_DATA_READY, + EX_SOURCE_DATA_EXHAUSTED, } EX_SOURCE_STATUS; #define COL_MATCH_FROM_COL_ID 0x1 @@ -138,11 +139,22 @@ typedef struct SLimitInfo { int64_t numOfOutputRows; } SLimitInfo; +typedef struct SSortMergeJoinOperatorParam { + SOperatorParam* pChild; +} SSortMergeJoinOperatorParam; + +typedef struct SExchangeOperatorParam { + SOperatorParam* pChild; + int32_t vgId; + SArray* uidList; +} SExchangeOperatorParam; + typedef struct SExchangeInfo { - SArray* pSources; - SArray* pSourceDataInfo; - tsem_t ready; - void* pTransporter; + SArray* pSources; + SSHashObj* pHashSources; + SArray* pSourceDataInfo; + tsem_t ready; + void* pTransporter; // SArray, result block list, used to keep the multi-block that // passed by downstream operator @@ -150,6 +162,7 @@ typedef struct SExchangeInfo { SArray* pRecycledBlocks; // build a pool for small data block to avoid to repeatly create and then destroy. SSDataBlock* pDummyBlock; // dummy block, not keep data bool seqLoadData; // sequential load data or not, false by default + bool dynamicOp; int32_t current; SLoadRemoteDataInfo loadInfo; uint64_t self; diff --git a/source/libs/executor/inc/groupcache.h b/source/libs/executor/inc/groupcache.h index 96aba1d7bc..b828f5abf2 100755 --- a/source/libs/executor/inc/groupcache.h +++ b/source/libs/executor/inc/groupcache.h @@ -22,7 +22,7 @@ extern "C" { #define GROUP_CACHE_DEFAULT_PAGE_SIZE 10485760 typedef struct SGcOperatorParam { - SOperatorBasicParam basic; + SOperatorParam* pChild; int64_t sessionId; int32_t downstreamIdx; bool needCache; diff --git a/source/libs/executor/inc/hashjoin.h b/source/libs/executor/inc/hashjoin.h index a4a180d542..2207acccb2 100755 --- a/source/libs/executor/inc/hashjoin.h +++ b/source/libs/executor/inc/hashjoin.h @@ -64,6 +64,7 @@ typedef struct SGroupData { } SGroupData; typedef struct SHJoinTableInfo { + int32_t downStreamIdx; SOperatorInfo* downStream; int32_t blkId; SQueryStat inputStat; diff --git a/source/libs/executor/inc/operator.h b/source/libs/executor/inc/operator.h index 5c42323061..39786b9a5e 100644 --- a/source/libs/executor/inc/operator.h +++ b/source/libs/executor/inc/operator.h @@ -27,19 +27,17 @@ typedef struct SOperatorCostInfo { struct SOperatorInfo; -typedef struct SOperatorBasicParam { - bool newExec; -} SOperatorBasicParam; - typedef struct SOperatorSpecParam { - int32_t opType; - void* value; + int32_t opType; + void* value; } SOperatorSpecParam; +typedef struct SOperatorBaseParam { + SOperatorParam* pChild; +} SOperatorBaseParam; + typedef struct SOperatorParam { - SOperatorBasicParam basic; - int32_t opNum; - SOperatorSpecParam* pOpParams; + SArray* pOpParams; //SArray } SOperatorParam; typedef int32_t (*__optr_encode_fn_t)(struct SOperatorInfo* pOperator, char** result, int32_t* length); @@ -84,6 +82,8 @@ typedef struct SOperatorInfo { SExecTaskInfo* pTaskInfo; SOperatorCostInfo cost; SResultInfo resultInfo; + SOperatorBaseParam* pOperatorParam; + SOperatorParam** pDownstreamParams; struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator SOperatorFpSet fpSet; @@ -174,6 +174,7 @@ void setOperatorCompleted(SOperatorInfo* pOperator); void setOperatorInfo(SOperatorInfo* pOperator, const char* name, int32_t type, bool blocking, int32_t status, void* pInfo, SExecTaskInfo* pTaskInfo); int32_t optrDefaultBufFn(SOperatorInfo* pOperator); +SSDataBlock* optrDefaultGetExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam); SOperatorInfo* createOperator(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SNode* pTagCond, SNode* pTagIndexCond, const char* pUser, const char* dbname); @@ -183,7 +184,7 @@ SOperatorInfo* extractOperatorInTree(SOperatorInfo* pOperator, int32_t type, con int32_t getTableScanInfo(SOperatorInfo* pOperator, int32_t* order, int32_t* scanFlag, bool inheritUsOrder); int32_t stopTableScanOperator(SOperatorInfo* pOperator, const char* pIdStr, SStorageAPI* pAPI); int32_t getOperatorExplainExecInfo(struct SOperatorInfo* operatorInfo, SArray* pExecInfoList); -void * getOperatorParam(int32_t opType, SOperatorParam* param); +void * getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx); #ifdef __cplusplus } diff --git a/source/libs/executor/src/aggregateoperator.c b/source/libs/executor/src/aggregateoperator.c index 5e28d73084..7a01a2e16b 100644 --- a/source/libs/executor/src/aggregateoperator.c +++ b/source/libs/executor/src/aggregateoperator.c @@ -114,7 +114,7 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN setOperatorInfo(pOperator, "TableAggregate", QUERY_NODE_PHYSICAL_PLAN_HASH_AGG, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenAggregateOptr, getAggregateResult, NULL, destroyAggOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) { STableScanInfo* pTableScanInfo = downstream->info; @@ -172,7 +172,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) { while (1) { bool blockAllocated = false; - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { if (!hasValidBlock) { createDataBlockForEmptyInput(pOperator, &pBlock); diff --git a/source/libs/executor/src/cachescanoperator.c b/source/libs/executor/src/cachescanoperator.c index 0732cdaa3d..52d0068f7d 100644 --- a/source/libs/executor/src/cachescanoperator.c +++ b/source/libs/executor/src/cachescanoperator.c @@ -134,7 +134,7 @@ SOperatorInfo* createCacherowsScanOperator(SLastRowScanPhysiNode* pScanNode, SRe pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, doScanCache, NULL, destroyCacheScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->cost.openCost = 0; return pOperator; diff --git a/source/libs/executor/src/dynqueryctrloperator.c b/source/libs/executor/src/dynqueryctrloperator.c index e9ee6bb970..db124b3a9d 100755 --- a/source/libs/executor/src/dynqueryctrloperator.c +++ b/source/libs/executor/src/dynqueryctrloperator.c @@ -27,28 +27,227 @@ #include "ttypes.h" #include "dynqueryctrl.h" +int64_t gSessionId = 0; + static void destroyDynQueryCtrlOperator(void* param) { SDynQueryCtrlOperatorInfo* pDynCtrlOperator = (SDynQueryCtrlOperatorInfo*)param; - taosMemoryFreeClear(param); } -SSDataBlock* getBlockFromDynQueryCtrl(SOperatorInfo* pOperator) { +static FORCE_INLINE int32_t buildGroupCacheOperatorParam(SOperatorParam** ppRes, int32_t downstreamIdx, bool needCache, void* pGrpValue, int32_t grpValSize, SOperatorParam* pChild) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SGcOperatorParam* pGc = taosMemoryMalloc(sizeof(SGcOperatorParam)); + if (NULL == pGc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pGc->sessionId = atomic_add_fetch_64(&gSessionId, 1); + pGc->downstreamIdx = downstreamIdx; + pGc->needCache = needCache; + pGc->pGroupValue = pGrpValue; + pGc->groupValueSize = grpValSize; + + SOperatorSpecParam specParam; + specParam.opType = QUERY_NODE_PHYSICAL_PLAN_GROUP_CACHE; + specParam.value = pGc; + + taosArrayPush((*ppRes)->pOpParams, &specParam); + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t buildExchangeOperatorParam(SOperatorParam** ppRes, int32_t* pVgId, int64_t* pUid, SOperatorParam* pChild) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pOpParams = taosArrayInit(1, sizeof(SOperatorSpecParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SExchangeOperatorParam* pExc = taosMemoryMalloc(sizeof(SExchangeOperatorParam)); + if (NULL == pExc) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + pExc->pChild = pChild; + pExc->vgId = *pVgId; + pExc->uidList = taosArrayInit(1, sizeof(int64_t)); + if (NULL == pExc->uidList) { + taosMemoryFree(pExc); + return TSDB_CODE_OUT_OF_MEMORY; + } + taosArrayPush(pExc->uidList, pUid); + + SOperatorSpecParam specParam; + specParam.opType = QUERY_NODE_PHYSICAL_PLAN_EXCHANGE; + specParam.value = pExc; + + taosArrayPush((*ppRes)->pOpParams, &specParam); + + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int32_t buildMergeJoinOperatorParam(SOperatorParam** ppRes, SOperatorParam* pChild0, SOperatorParam* pChild1) { + *ppRes = taosMemoryMalloc(sizeof(SOperatorParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + (*ppRes)->pOpParams = taosArrayInit(2, sizeof(SOperatorSpecParam)); + if (NULL == *ppRes) { + return TSDB_CODE_OUT_OF_MEMORY; + } + + SSortMergeJoinOperatorParam* pJoin0 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); + SSortMergeJoinOperatorParam* pJoin1 = taosMemoryMalloc(sizeof(SSortMergeJoinOperatorParam)); + if (NULL == pJoin0 || NULL == pJoin1) { + taosMemoryFree(pJoin0); + taosMemoryFree(pJoin1); + return TSDB_CODE_OUT_OF_MEMORY; + } + + pJoin0->pChild = pChild0; + pJoin1->pChild = pChild1; + + SOperatorSpecParam specParam; + specParam.opType = QUERY_NODE_PHYSICAL_PLAN_MERGE_JOIN; + specParam.value = pJoin0; + taosArrayPush((*ppRes)->pOpParams, &specParam); + specParam.value = pJoin1; + taosArrayPush((*ppRes)->pOpParams, &specParam); + + return TSDB_CODE_SUCCESS; +} + + +static int32_t buildStbJoinOperatorParam(SDynQueryCtrlOperatorInfo* pInfo, SStbJoinPrevJoinCtx* pPrev, SOperatorParam** ppParam) { + int32_t rowIdx = pPrev->lastRow + 1; + SColumnInfoData* pVg0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[0]); + SColumnInfoData* pVg1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.vgSlot[1]); + SColumnInfoData* pUid0 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[0]); + SColumnInfoData* pUid1 = taosArrayGet(pPrev->pLastBlk->pDataBlock, pInfo->stbJoin.basic.uidSlot[1]); + SOperatorParam* pExcParam0 = NULL; + SOperatorParam* pExcParam1 = NULL; + SOperatorParam* pGcParam0 = NULL; + SOperatorParam* pGcParam1 = NULL; + + int32_t code = buildExchangeOperatorParam(&pExcParam0, pVg0->pData + pVg0->info.bytes * rowIdx, pUid0->pData + pUid0->info.bytes * rowIdx, NULL); + if (TSDB_CODE_SUCCESS == code) { + code = buildExchangeOperatorParam(&pExcParam1, pVg1->pData + pVg1->info.bytes * rowIdx, pUid1->pData + pUid1->info.bytes * rowIdx, NULL); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildGroupCacheOperatorParam(&pGcParam0, 0, false, pUid0->pData + pUid0->info.bytes * rowIdx, pUid0->info.bytes, pExcParam0); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildGroupCacheOperatorParam(&pGcParam1, 1, false, pUid1->pData + pUid1->info.bytes * rowIdx, pUid1->info.bytes, pExcParam1); + } + if (TSDB_CODE_SUCCESS == code) { + code = buildMergeJoinOperatorParam(ppParam, pGcParam0, pGcParam1); + } + return code; +} + +static void seqJoinLaunchPostJoin(SOperatorInfo* pOperator, SSDataBlock** ppRes) { SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; + SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; + SOperatorParam* pParam = NULL; + + int32_t code = buildStbJoinOperatorParam(pInfo, pPrev, &pParam); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } + + *ppRes = pOperator->pDownstream[1]->fpSet.getNextExtFn(pOperator->pDownstream[1], pParam); + if (*ppRes) { + pPost->isStarted = true; + } +} + +static void seqJoinWithSeqRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; + while (true) { - SSDataBlock* pBlock = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + if ((pPrev->lastRow + 1) >= pPrev->pLastBlk->info.rows) { + *ppRes = NULL; + pPrev->pLastBlk = NULL; + return; + } + + seqJoinLaunchPostJoin(pOperator, ppRes); + pPrev->lastRow++; + if (*ppRes) { + break; + } + } +} + +static void seqJoinContinueRetrieve(SOperatorInfo* pOperator, SSDataBlock** ppRes) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + SStbJoinPrevJoinCtx* pPrev = &pStbJoin->ctx.prev; + SStbJoinPostJoinCtx* pPost = &pStbJoin->ctx.post; + + if (pPost->isStarted) { + *ppRes = getNextBlockFromDownstream(pOperator, 1); + if (NULL == *ppRes) { + pPost->isStarted = false; + } else { + return; + } + } + + if (pStbJoin->ctx.prev.pLastBlk) { + seqJoinWithSeqRetrieve(pOperator, ppRes); + } +} + +SSDataBlock* getResFromStbJoin(SOperatorInfo* pOperator) { + SDynQueryCtrlOperatorInfo* pInfo = pOperator->info; + SStbJoinDynCtrlInfo* pStbJoin = (SStbJoinDynCtrlInfo*)&pInfo->stbJoin; + SSDataBlock* pRes = NULL; + + seqJoinContinueRetrieve(pOperator, &pRes); + if (pRes) { + return pRes; + } + + while (true) { + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (NULL == pBlock) { break; } - addBlkToGroupCache(pOperator, pBlock, &pRes); + pStbJoin->ctx.prev.pLastBlk = pBlock; + pStbJoin->ctx.prev.lastRow = -1; + + seqJoinWithSeqRetrieve(pOperator, &pRes); + if (pRes) { + break; + } } + + return pRes; } SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SDynQueryCtrlPhysiNode* pPhyciNode, SExecTaskInfo* pTaskInfo) { SDynQueryCtrlOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SDynQueryCtrlOperatorInfo)); SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo)); + __optr_fn_t nextFp = NULL; int32_t code = TSDB_CODE_SUCCESS; if (pOperator == NULL || pInfo == NULL) { @@ -61,11 +260,21 @@ SOperatorInfo* createDynQueryCtrlOperatorInfo(SOperatorInfo** pDownstream, int32 goto _error; } - memcpy(&pInfo->ctrlInfo, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); + pInfo->qType = pPhyciNode->qType; + switch (pInfo->qType) { + case DYN_QTYPE_STB_HASH: + memcpy(&pInfo->stbJoin, &pPhyciNode->stbJoin, sizeof(pPhyciNode->stbJoin)); + nextFp = getResFromStbJoin; + break; + default: + qError("unsupported dynamic query ctrl type: %d", pInfo->qType); + code = TSDB_CODE_INVALID_PARA; + goto _error; + } setOperatorInfo(pOperator, "DynQueryCtrlOperator", QUERY_NODE_PHYSICAL_PLAN_DYN_QUERY_CTRL, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getBlockFromDynQueryCtrl, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, nextFp, NULL, destroyDynQueryCtrlOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; diff --git a/source/libs/executor/src/eventwindowoperator.c b/source/libs/executor/src/eventwindowoperator.c index 918c51d5d4..a22399e423 100644 --- a/source/libs/executor/src/eventwindowoperator.c +++ b/source/libs/executor/src/eventwindowoperator.c @@ -133,7 +133,7 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo setOperatorInfo(pOperator, "EventWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, eventWindowAggregate, NULL, destroyEWindowOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -193,7 +193,7 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } diff --git a/source/libs/executor/src/exchangeoperator.c b/source/libs/executor/src/exchangeoperator.c index b3aa41a1a5..079712f7f1 100644 --- a/source/libs/executor/src/exchangeoperator.c +++ b/source/libs/executor/src/exchangeoperator.c @@ -40,6 +40,7 @@ typedef struct SSourceDataInfo { int32_t code; EX_SOURCE_STATUS status; const char* taskId; + SArray* pUidList; } SSourceDataInfo; static void destroyExchangeOperatorInfo(void* param); @@ -244,6 +245,10 @@ static int32_t initDataSource(int32_t numOfSources, SExchangeInfo* pInfo, const return TSDB_CODE_OUT_OF_MEMORY; } + if (pInfo->dynamicOp) { + return TSDB_CODE_SUCCESS; + } + for (int32_t i = 0; i < numOfSources; ++i) { SSourceDataInfo dataInfo = {0}; dataInfo.status = EX_SOURCE_DATA_NOT_READY; @@ -272,9 +277,17 @@ static int32_t initExchangeOperator(SExchangePhysiNode* pExNode, SExchangeInfo* return TSDB_CODE_OUT_OF_MEMORY; } + if (pExNode->node.dynamicOp) { + pInfo->pHashSources = tSimpleHashInit(numOfSources * 2, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT)); + if (NULL == pInfo->pHashSources) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + for (int32_t i = 0; i < numOfSources; ++i) { SDownstreamSourceNode* pNode = (SDownstreamSourceNode*)nodesListGetNode((SNodeList*)pExNode->pSrcEndPoints, i); taosArrayPush(pInfo->pSources, pNode); + tSimpleHashPut(pInfo->pHashSources, &pNode->addr.nodeId, sizeof(pNode->addr.nodeId), &i, sizeof(i)); } initLimitInfo(pExNode->node.pLimit, pExNode->node.pSlimit, &pInfo->limitInfo); @@ -290,6 +303,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode goto _error; } + pInfo->dynamicOp = pExNode->node.dynamicOp; int32_t code = initExchangeOperator(pExNode, pInfo, GET_TASKID(pTaskInfo)); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -316,7 +330,7 @@ SOperatorInfo* createExchangeOperatorInfo(void* pTransporter, SExchangePhysiNode } pOperator->fpSet = - createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(prepareLoadRemoteData, loadRemoteData, NULL, destroyExchangeOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; _error: @@ -403,13 +417,15 @@ int32_t loadRemoteDataCallback(void* param, SDataBuf* pMsg, int32_t code) { } int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTaskInfo, int32_t sourceIndex) { - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); - - SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); SSourceDataInfo* pDataInfo = taosArrayGet(pExchangeInfo->pSourceDataInfo, sourceIndex); - pDataInfo->startTime = taosGetTimestampUs(); + if (EX_SOURCE_DATA_NOT_READY != pDataInfo->status) { + return TSDB_CODE_SUCCESS; + } - ASSERT(pDataInfo->status == EX_SOURCE_DATA_NOT_READY); + pDataInfo->status = EX_SOURCE_DATA_STARTED; + SDownstreamSourceNode* pSource = taosArrayGet(pExchangeInfo->pSources, sourceIndex); + pDataInfo->startTime = taosGetTimestampUs(); + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); SFetchRspHandleWrapper* pWrapper = taosMemoryCalloc(1, sizeof(SFetchRspHandleWrapper)); pWrapper->exchangeId = pExchangeInfo->self; @@ -429,6 +445,9 @@ int32_t doSendFetchDataRequest(SExchangeInfo* pExchangeInfo, SExecTaskInfo* pTas req.taskId = pSource->taskId; req.queryId = pTaskInfo->id.queryId; req.execId = pSource->execId; + if (pDataInfo->pUidList) { + req.opParam = buildTableScanOperatorParam(pDataInfo->pUidList); + } int32_t msgSize = tSerializeSResFetchReq(NULL, 0, &req); if (msgSize < 0) { @@ -560,7 +579,7 @@ int32_t prepareConcurrentlyLoad(SOperatorInfo* pOperator) { SExchangeInfo* pExchangeInfo = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - size_t totalSources = taosArrayGetSize(pExchangeInfo->pSources); + size_t totalSources = taosArrayGetSize(pExchangeInfo->pSourceDataInfo); int64_t startTs = taosGetTimestampUs(); // Asynchronously send all fetch requests to all sources. @@ -693,14 +712,41 @@ _error: return code; } +int32_t addDynamicExchangeSource(SOperatorInfo* pOperator) { + SExchangeInfo* pExchangeInfo = pOperator->info; + SExchangeOperatorParam* pParam = (SExchangeOperatorParam*)pOperator->pOperatorParam; + int32_t* pIdx = tSimpleHashGet(pExchangeInfo->pHashSources, &pParam->vgId, sizeof(pParam->vgId)); + if (NULL == pIdx) { + qError("No exchange source for vgId: %d", pParam->vgId); + pOperator->pTaskInfo->code = TSDB_CODE_INVALID_PARA; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } + + SSourceDataInfo dataInfo = {0}; + dataInfo.status = EX_SOURCE_DATA_NOT_READY; + dataInfo.taskId = GET_TASKID(pOperator->pTaskInfo); + dataInfo.index = *pIdx; + dataInfo.pUidList = taosArrayDup(pParam->uidList, NULL); + taosArrayPush(pExchangeInfo->pSourceDataInfo, &dataInfo); +} + + int32_t prepareLoadRemoteData(SOperatorInfo* pOperator) { - if (OPTR_IS_OPENED(pOperator)) { + SExchangeInfo* pExchangeInfo = pOperator->info; + int32_t code = TSDB_CODE_SUCCESS; + if (OPTR_IS_OPENED(pOperator) && !pExchangeInfo->dynamicOp) { return TSDB_CODE_SUCCESS; } + if (pExchangeInfo->dynamicOp) { + code = addDynamicExchangeSource(pOperator); + if (code) { + return code; + } + } + int64_t st = taosGetTimestampUs(); - SExchangeInfo* pExchangeInfo = pOperator->info; if (!pExchangeInfo->seqLoadData) { int32_t code = prepareConcurrentlyLoad(pOperator); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/filloperator.c b/source/libs/executor/src/filloperator.c index 64e2bd4020..467ec499cb 100644 --- a/source/libs/executor/src/filloperator.c +++ b/source/libs/executor/src/filloperator.c @@ -197,7 +197,7 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) { } while (1) { - SSDataBlock* pBlock = pDownstream->fpSet.getNextFn(pDownstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { if (pInfo->totalInputRows == 0 && (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) { @@ -444,7 +444,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = pInfo->numOfExpr; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, doFill, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); return pOperator; @@ -1304,7 +1304,7 @@ static SSDataBlock* doStreamFill(SOperatorInfo* pOperator) { while (1) { if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) { // If there are delete datablocks, we receive them first. - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; pInfo->pFillInfo->preRowKey = INT64_MIN; @@ -1561,7 +1561,7 @@ SOperatorInfo* createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFi setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, doStreamFill, NULL, destroyStreamFillOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/groupcacheoperator.c b/source/libs/executor/src/groupcacheoperator.c index 4822f1960e..b882145d92 100755 --- a/source/libs/executor/src/groupcacheoperator.c +++ b/source/libs/executor/src/groupcacheoperator.c @@ -147,7 +147,8 @@ static int32_t initGroupCacheSession(struct SOperatorInfo* pOperator, SGcOperato static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOperatorInfo* pGCache, SGcOperatorParam* pParam, SSDataBlock** ppRes, SGcSessionCtx** ppSession) { SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; - if (pParam->basic.newExec) { + SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); + if (NULL == pCtx) { int32_t code = initGroupCacheSession(pOperator, pParam, ppSession); if (TSDB_CODE_SUCCESS != code) { pTaskInfo->code = code; @@ -160,13 +161,7 @@ static void getFromSessionCache(struct SOperatorInfo* pOperator, SGroupCacheOper } return; } - - SGcSessionCtx* pCtx = tSimpleHashGet(pGCache->pSessionHash, &pParam->sessionId, sizeof(pParam->sessionId)); - if (NULL == pCtx) { - qError("session %" PRIx64 " not exists", pParam->sessionId); - pTaskInfo->code = TSDB_CODE_INVALID_PARA; - T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); - } + *ppSession = pCtx; if (pCtx->cacheHit) { @@ -198,16 +193,13 @@ static void addBlkToGroupCache(struct SOperatorInfo* pOperator, SSDataBlock* pBl *ppRes = pBlock; } -static SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator, SOperatorParam* param) { +SSDataBlock* getFromGroupCache(struct SOperatorInfo* pOperator) { SGroupCacheOperatorInfo* pGCache = pOperator->info; SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo; + SGcOperatorParam* pParam = (SGcOperatorParam*)pOperator->pOperatorParam; SGcSessionCtx* pSession = NULL; SSDataBlock* pRes = NULL; int32_t code = TSDB_CODE_SUCCESS; - SGcOperatorParam* pParam = getOperatorParam(pOperator->operatorType, param); - if (NULL == pParam) { - return NULL; - } getFromSessionCache(pOperator, pGCache, pParam, &pRes, &pSession); pGCache->pCurrent = pSession; @@ -274,7 +266,7 @@ SOperatorInfo* createGroupCacheOperatorInfo(SOperatorInfo** pDownstream, int32_t goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, NULL, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, getFromGroupCache, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, getFromGroupCache, NULL, destroyGroupCacheOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; diff --git a/source/libs/executor/src/groupoperator.c b/source/libs/executor/src/groupoperator.c index d072b45e4e..5505faab7a 100644 --- a/source/libs/executor/src/groupoperator.c +++ b/source/libs/executor/src/groupoperator.c @@ -383,7 +383,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } @@ -480,7 +480,7 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode* pInfo->binfo.outputTsOrder = pAggNode->node.outputTsOrder; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, hashGroupbyAggregate, NULL, destroyGroupOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -754,7 +754,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } @@ -906,7 +906,7 @@ SOperatorInfo* createPartitionOperatorInfo(SOperatorInfo* downstream, SPartition pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, hashPartition, NULL, destroyPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -1110,7 +1110,7 @@ static SSDataBlock* doStreamHashPartition(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; { pInfo->pInputDataBlock = NULL; - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { setOperatorCompleted(pOperator); return NULL; @@ -1323,7 +1323,7 @@ SOperatorInfo* createStreamPartitionOperatorInfo(SOperatorInfo* downstream, SStr pOperator->exprSupp.numOfExprs = numOfCols; pOperator->exprSupp.pExprInfo = pExprInfo; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamHashPartition, NULL, - destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + destroyStreamPartitionOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); initParDownStream(downstream, &pInfo->partitionSup, &pInfo->scalarSup); code = appendDownstream(pOperator, &downstream, 1); diff --git a/source/libs/executor/src/hashjoinoperator.c b/source/libs/executor/src/hashjoinoperator.c index b700eec374..1e3b78719c 100755 --- a/source/libs/executor/src/hashjoinoperator.c +++ b/source/libs/executor/src/hashjoinoperator.c @@ -188,6 +188,9 @@ static void setHJoinBuildAndProbeTable(SHJoinOperatorInfo* pInfo, SHashJoinPhysi pInfo->pBuild = &pInfo->tbs[buildIdx]; pInfo->pProbe = &pInfo->tbs[probeIdx]; + + pInfo->pBuild->downStreamIdx = buildIdx; + pInfo->pProbe->downStreamIdx = probeIdx; } static int32_t buildHJoinResColMap(SHJoinOperatorInfo* pInfo, SHashJoinPhysiNode* pJoinNode) { @@ -630,12 +633,13 @@ static int32_t addBlockRowsToHash(SSDataBlock* pBlock, SHJoinOperatorInfo* pJoin return code; } -static int32_t buildHJoinKeyHash(SHJoinOperatorInfo* pJoin) { +static int32_t buildHJoinKeyHash(struct SOperatorInfo* pOperator) { + SHJoinOperatorInfo* pJoin = pOperator->info; SSDataBlock* pBlock = NULL; int32_t code = TSDB_CODE_SUCCESS; while (true) { - pBlock = pJoin->pBuild->downStream->fpSet.getNextFn(pJoin->pBuild->downStream); + pBlock = getNextBlockFromDownstream(pOperator, pJoin->pBuild->downStreamIdx) if (NULL == pBlock) { break; } @@ -690,7 +694,7 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { } if (NULL == pJoin->pKeyHash) { - code = buildHJoinKeyHash(pJoin); + code = buildHJoinKeyHash(pOperator); if (code) { pTaskInfo->code = code; T_LONG_JMP(pTaskInfo->env, code); @@ -714,7 +718,7 @@ static SSDataBlock* doHashJoin(struct SOperatorInfo* pOperator) { } while (true) { - SSDataBlock* pBlock = pJoin->pProbe->downStream->fpSet.getNextFn(pJoin->pProbe->downStream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, pJoin->pProbe->downStreamIdx); if (NULL == pBlock) { setHJoinDone(pOperator); break; @@ -815,7 +819,7 @@ SOperatorInfo* createHashJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t n goto _error; } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doHashJoin, NULL, destroyHashJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; diff --git a/source/libs/executor/src/mergejoinoperator.c b/source/libs/executor/src/mergejoinoperator.c index 1e010f5ae0..653503fb39 100644 --- a/source/libs/executor/src/mergejoinoperator.c +++ b/source/libs/executor/src/mergejoinoperator.c @@ -271,7 +271,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY); pInfo->rightBuildTable = tSimpleHashInit(256, hashFn); } - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeJoin, NULL, destroyMergeJoinOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, pDownstream, numOfDownstream); if (code != TSDB_CODE_SUCCESS) { goto _error; @@ -416,7 +416,7 @@ static int32_t mergeJoinGetDownStreamRowsEqualTimeStamp(SOperatorInfo* pOperator mergeJoinGetBlockRowsEqualTs(dataBlock, tsSlotId, startPos, timestamp, &endPos, rowLocations, createdBlocks); while (endPos == dataBlock->info.rows) { SOperatorInfo* ds = pOperator->pDownstream[whichChild]; - dataBlock = ds->fpSet.getNextFn(ds); + dataBlock = getNextBlockFromDownstream(pOperator, whichChild); if (whichChild == 0) { pJoinInfo->leftPos = 0; pJoinInfo->pLeft = dataBlock; @@ -616,8 +616,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs SMJoinOperatorInfo* pJoinInfo = pOperator->info; if (pJoinInfo->pLeft == NULL || pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) { - SOperatorInfo* ds1 = pOperator->pDownstream[0]; - pJoinInfo->pLeft = ds1->fpSet.getNextFn(ds1); + pJoinInfo->pLeft = getNextBlockFromDownstream(pOperator, 0); pJoinInfo->leftPos = 0; if (pJoinInfo->pLeft == NULL) { @@ -627,8 +626,7 @@ static bool mergeJoinGetNextTimestamp(SOperatorInfo* pOperator, int64_t* pLeftTs } if (pJoinInfo->pRight == NULL || pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) { - SOperatorInfo* ds2 = pOperator->pDownstream[1]; - pJoinInfo->pRight = ds2->fpSet.getNextFn(ds2); + pJoinInfo->pRight = getNextBlockFromDownstream(pOperator, 1); pJoinInfo->rightPos = 0; if (pJoinInfo->pRight == NULL) { @@ -715,3 +713,6 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { } return (pRes->info.rows > 0) ? pRes : NULL; } + + + diff --git a/source/libs/executor/src/operator.c b/source/libs/executor/src/operator.c index 32035db0a4..2643332f58 100644 --- a/source/libs/executor/src/operator.c +++ b/source/libs/executor/src/operator.c @@ -87,6 +87,16 @@ int32_t optrDefaultBufFn(SOperatorInfo* pOperator) { } } +SSDataBlock* optrDefaultGetNextExtFn(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { + pOperator->pOperatorParam = getOperatorParam(pOperator->operatorType, pParam, 0); + int32_t code = setOperatorParams(pOperator, pOperator->pOperatorParam ? pOperator->pOperatorParam->pChild : NULL); + if (TSDB_CODE_SUCCESS != code) { + pOperator->pTaskInfo->code = code; + T_LONG_JMP(pOperator->pTaskInfo->env, pOperator->pTaskInfo->code); + } + return pOperator->fpSet.getNextFn(pOperator); +} + static int64_t getQuerySupportBufSize(size_t numOfTables) { size_t s1 = sizeof(STableQueryInfo); // size_t s3 = sizeof(STableCheckInfo); buffer consumption in tsdb @@ -592,16 +602,54 @@ int32_t getOperatorExplainExecInfo(SOperatorInfo* operatorInfo, SArray* pExecInf return TSDB_CODE_SUCCESS; } -void *getOperatorParam(int32_t opType, SOperatorParam* param) { +void *getOperatorParam(int32_t opType, SOperatorParam* param, int32_t idx) { if (NULL == param) { return NULL; } - for (int32_t i = 0; i < param->opNum; ++i) { - if (opType == param->pOpParams[i].opType) { - memcpy(¶m->pOpParams[i], param, sizeof(param->basic)); - return ¶m->pOpParams[i]; - } + int32_t opNum = taosArrayGetSize(param->pOpParams); + if (idx >= opNum) { + return NULL; + } + SOperatorSpecParam *pSpecParam = taosArrayGet(param->pOpParams, idx); + if (opType == pSpecParam->opType) { + return pSpecParam->value; } return NULL; } +int32_t setOperatorParams(struct SOperatorInfo* pOperator, SOperatorParam* pParam) { + if (NULL == pParam) { + taosMemoryFreeClear(pOperator->pDownstreamParams); + return TSDB_CODE_SUCCESS; + } + + if (NULL == pOperator->pDownstreamParams) { + pOperator->pDownstreamParams = taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES); + if (NULL == pOperator->pDownstreamParams) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + + SOperatorBaseParam* pBaseParam = NULL; + + for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) { + pBaseParam = getOperatorParam(pOperator->operatorType, pParam, i); + if (pBaseParam) { + pOperator->pDownstreamParams[i] = pBaseParam->pChild; + } else { + pOperator->pDownstreamParams[i] = pParam; + } + } + + return TSDB_CODE_SUCCESS; +} + +FORCE_INLINE SSDataBlock* getNextBlockFromDownstream(struct SOperatorInfo* pOperator, int32_t idx) { + if (pOperator->pDownstreamParams && pOperator->pDownstreamParams[idx]) { + return pOperator->pDownstream[idx]->fpSet.getNextExtFn(pOperator->pDownstream[idx], pOperator->pDownstreamParams[idx]); + } + + return pOperator->pDownstream[idx]->fpSet.getNextFn(pOperator->pDownstream[idx]); +} + + diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index e2dfebe818..6c5731461e 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -135,7 +135,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys setOperatorInfo(pOperator, "ProjectOperator", QUERY_NODE_PHYSICAL_PLAN_PROJECT, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doProjectOperation, NULL, destroyProjectOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -260,7 +260,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { blockDataCleanup(pRes); // The downstream exec may change the value of the newgroup, so use a local variable instead. - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { qDebug("set op close, exec %d, status %d rows %" PRId64 , pTaskInfo->execModel, pOperator->status, pFinalRes->info.rows); setOperatorCompleted(pOperator); @@ -416,7 +416,7 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy setOperatorInfo(pOperator, "IndefinitOperator", QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doApplyIndefinitFunction, NULL, destroyIndefinitOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -501,7 +501,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) { if (pInfo->pRes->info.rows < pOperator->resultInfo.threshold) { while (1) { // The downstream exec may change the value of the newgroup, so use a local variable instead. - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { setOperatorCompleted(pOperator); break; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 5dd0383fe9..41f14ac39b 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -981,7 +981,7 @@ SOperatorInfo* createTableScanOperatorInfo(STableScanPhysiNode* pTableScanNode, taosLRUCacheSetStrictCapacity(pInfo->base.metaCache.pTableMetaEntryCache, false); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScan, NULL, destroyTableScanOperatorInfo, - optrDefaultBufFn, getTableScannerExecInfo, NULL, NULL); + optrDefaultBufFn, getTableScannerExecInfo, optrDefaultGetNextExtFn, NULL); // for non-blocking operator, the open cost is always 0 pOperator->cost.openCost = 0; @@ -1006,7 +1006,7 @@ SOperatorInfo* createTableSeqScanOperatorInfo(void* pReadHandle, SExecTaskInfo* setOperatorInfo(pOperator, "TableSeqScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SEQ_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableScanImpl, NULL, NULL, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; } @@ -2252,7 +2252,7 @@ SOperatorInfo* createRawScanOperatorInfo(SReadHandle* pHandle, SExecTaskInfo* pT setOperatorInfo(pOperator, "RawScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); - pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + pOperator->fpSet = createOperatorFpSet(NULL, doRawScan, NULL, destroyRawScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; _end: @@ -2474,7 +2474,7 @@ SOperatorInfo* createStreamScanOperatorInfo(SReadHandle* pHandle, STableScanPhys __optr_fn_t nextFn = (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) ? doStreamScan : doQueueScan; pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, nextFn, NULL, destroyStreamScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; @@ -2639,7 +2639,7 @@ SOperatorInfo* createTagScanOperatorInfo(SReadHandle* pReadHandle, STagScanPhysi blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, doTagScan, NULL, destroyTagScanOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; @@ -3094,7 +3094,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN pOperator->exprSupp.numOfExprs = numOfCols; pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableMergeScan, NULL, destroyTableMergeScanOperatorInfo, - optrDefaultBufFn, getTableMergeScanExplainExecInfo, NULL, NULL); + optrDefaultBufFn, getTableMergeScanExplainExecInfo, optrDefaultGetNextExtFn, NULL); pOperator->cost.openCost = 0; return pOperator; @@ -3244,7 +3244,7 @@ SOperatorInfo* createTableCountScanOperatorInfo(SReadHandle* readHandle, STableC setOperatorInfo(pOperator, "TableCountScanOperator", QUERY_NODE_PHYSICAL_PLAN_TABLE_COUNT_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTableCountScan, NULL, destoryTableCountScanOperator, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/sortoperator.c b/source/libs/executor/src/sortoperator.c index 8a5ec610f8..1e4329ab70 100644 --- a/source/libs/executor/src/sortoperator.c +++ b/source/libs/executor/src/sortoperator.c @@ -87,7 +87,7 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode* // TODO dynamic set the available sort buffer pOperator->fpSet = - createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, NULL, NULL); + createOperatorFpSet(doOpenSortOperator, doSort, NULL, destroySortOperatorInfo, optrDefaultBufFn, getExplainExecInfo, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -457,7 +457,7 @@ SSDataBlock* doGroupSort(SOperatorInfo* pOperator) { if (!pInfo->hasGroupId) { pInfo->hasGroupId = true; - pInfo->prefetchedSortInput = pOperator->pDownstream[0]->fpSet.getNextFn(pOperator->pDownstream[0]); + pInfo->prefetchedSortInput = getNextBlockFromDownstream(pOperator, 0); if (pInfo->prefetchedSortInput == NULL) { setOperatorCompleted(pOperator); return NULL; @@ -552,7 +552,7 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort pInfo->pSortInfo = createSortInfo(pSortPhyNode->pSortKeys); setOperatorInfo(pOperator, "GroupSortOperator", QUERY_NODE_PHYSICAL_PLAN_GROUP_SORT, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doGroupSort, NULL, destroyGroupSortOperatorInfo, - optrDefaultBufFn, getGroupSortExplainExecInfo, NULL, NULL); + optrDefaultBufFn, getGroupSortExplainExecInfo, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -841,7 +841,7 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size setOperatorInfo(pOperator, "MultiwayMergeOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openMultiwayMergeOperator, doMultiwayMerge, NULL, - destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, NULL, NULL); + destroyMultiwayMergeOperatorInfo, optrDefaultBufFn, getMultiwayMergeExplainExecInfo, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, downStreams, numStreams); if (code != TSDB_CODE_SUCCESS) { diff --git a/source/libs/executor/src/sysscanoperator.c b/source/libs/executor/src/sysscanoperator.c index cf238266b3..f69e3ba6b9 100644 --- a/source/libs/executor/src/sysscanoperator.c +++ b/source/libs/executor/src/sysscanoperator.c @@ -1783,7 +1783,7 @@ SOperatorInfo* createSysTableScanOperatorInfo(void* readHandle, SSystemTableScan pInfo, pTaskInfo); pOperator->exprSupp.numOfExprs = taosArrayGetSize(pInfo->pRes->pDataBlock); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, doSysTableScan, NULL, destroySysScanOperator, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; _error: @@ -2320,7 +2320,7 @@ SOperatorInfo* createDataBlockInfoScanOperator(SReadHandle* readHandle, SBlockDi setOperatorInfo(pOperator, "DataBlockDistScanOperator", QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doBlockInfoScan, NULL, destroyBlockDistScanOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); return pOperator; _error: diff --git a/source/libs/executor/src/timesliceoperator.c b/source/libs/executor/src/timesliceoperator.c index 06daa2c201..d0f9b94a0a 100644 --- a/source/libs/executor/src/timesliceoperator.c +++ b/source/libs/executor/src/timesliceoperator.c @@ -887,7 +887,7 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) { } while (1) { - SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { setOperatorCompleted(pOperator); break; @@ -998,7 +998,7 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = - createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + createOperatorFpSet(optrDummyOpenFn, doTimeslice, NULL, destroyTimeSliceOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity); diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 3e1b19ca1c..3a32f3f5c9 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -1026,7 +1026,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) { int64_t st = taosGetTimestampUs(); while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } @@ -1162,7 +1162,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } @@ -1684,7 +1684,7 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(doOpenIntervalAgg, doBuildIntervalResult, NULL, destroyIntervalOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -1807,7 +1807,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) { SOperatorInfo* downstream = pOperator->pDownstream[0]; while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } @@ -1907,7 +1907,7 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi setOperatorInfo(pOperator, "StateWindowOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(openStateWindowAggOptr, doStateWindowAgg, NULL, destroyStateWindowOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -1983,7 +1983,7 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW setOperatorInfo(pOperator, "SessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doSessionWindowAgg, NULL, destroySWindowOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pOperator->pTaskInfo = pTaskInfo; code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -2572,7 +2572,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { T_LONG_JMP(pTaskInfo->env, pTaskInfo->code); } - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { pOperator->status = OP_RES_TO_RETURN; qDebug("===stream===return data:%s. recv datablock num:%" PRIu64, @@ -2834,7 +2834,7 @@ SOperatorInfo* createStreamFinalIntervalOperatorInfo(SOperatorInfo* downstream, pOperator->info = pInfo; pOperator->fpSet = createOperatorFpSet(NULL, doStreamFinalIntervalAgg, NULL, destroyStreamFinalIntervalOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); if (pPhyNode->type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SEMI_INTERVAL) { initIntervalDownStream(downstream, pPhyNode->type, pInfo); } @@ -3509,7 +3509,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) { pInfo->pStUpdated = tSimpleHashInit(64, hashFn); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } @@ -3670,7 +3670,7 @@ SOperatorInfo* createStreamSessionAggOperatorInfo(SOperatorInfo* downstream, SPh setOperatorInfo(pOperator, "StreamSessionWindowAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_SESSION, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionAgg, NULL, destroyStreamSessionAggOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); if (downstream) { initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); @@ -3735,7 +3735,7 @@ static SSDataBlock* doStreamSessionSemiAgg(SOperatorInfo* pOperator) { pInfo->pStUpdated = tSimpleHashInit(64, hashFn); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { clearSpecialDataBlock(pInfo->pUpdateRes); pOperator->status = OP_RES_TO_RETURN; @@ -3826,7 +3826,7 @@ SOperatorInfo* createStreamFinalSessionAggOperatorInfo(SOperatorInfo* downstream pInfo->pUpdateRes = createSpecialDataBlock(STREAM_CLEAR); blockDataEnsureCapacity(pInfo->pUpdateRes, 128); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamSessionSemiAgg, NULL, - destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + destroyStreamSessionAggOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); } setOperatorInfo(pOperator, name, pPhyNode->type, false, OP_NOT_OPENED, pInfo, pTaskInfo); @@ -4072,7 +4072,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) { pInfo->pSeUpdated = tSimpleHashInit(64, hashFn); } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { break; } @@ -4203,7 +4203,7 @@ SOperatorInfo* createStreamStateAggOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorInfo(pOperator, "StreamStateAggOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_STATE, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamStateAgg, NULL, destroyStreamStateOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); initDownStream(downstream, &pInfo->streamAggSup, pOperator->operatorType, pInfo->primaryTsIndex, &pInfo->twAggSup); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4335,7 +4335,7 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = NULL; if (pMiaInfo->prefetchedBlock == NULL) { - pBlock = downstream->fpSet.getNextFn(downstream); + pBlock = getNextBlockFromDownstream(pOperator, 0); } else { pBlock = pMiaInfo->prefetchedBlock; pMiaInfo->prefetchedBlock = NULL; @@ -4484,7 +4484,7 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream, false, OP_NOT_OPENED, miaInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, mergeAlignedIntervalAgg, NULL, destroyMAIOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4665,7 +4665,7 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) { while (1) { SSDataBlock* pBlock = NULL; if (miaInfo->prefetchedBlock == NULL) { - pBlock = downstream->fpSet.getNextFn(downstream); + pBlock = getNextBlockFromDownstream(pOperator, 0); } else { pBlock = miaInfo->prefetchedBlock; miaInfo->groupId = pBlock->info.id.groupId; @@ -4771,7 +4771,7 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge setOperatorInfo(pOperator, "TimeMergeIntervalAggOperator", QUERY_NODE_PHYSICAL_PLAN_MERGE_INTERVAL, false, OP_NOT_OPENED, pMergeIntervalInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doMergeIntervalAgg, NULL, destroyMergeIntervalOperatorInfo, - optrDefaultBufFn, NULL, NULL, NULL); + optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); code = appendDownstream(pOperator, &downstream, 1); if (code != TSDB_CODE_SUCCESS) { @@ -4842,7 +4842,7 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) { } while (1) { - SSDataBlock* pBlock = downstream->fpSet.getNextFn(downstream); + SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0); if (pBlock == NULL) { qDebug("===stream===return data:single interval. recv datablock num:%" PRIu64, pInfo->numOfDatapack); pInfo->numOfDatapack = 0; @@ -5027,7 +5027,7 @@ SOperatorInfo* createStreamIntervalOperatorInfo(SOperatorInfo* downstream, SPhys setOperatorInfo(pOperator, "StreamIntervalOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL, true, OP_NOT_OPENED, pInfo, pTaskInfo); pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamIntervalAgg, NULL, - destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, NULL, NULL); + destroyStreamFinalIntervalOperatorInfo, optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL); pInfo->statestore = pTaskInfo->storageAPI.stateStore; pInfo->recvGetAll = false; diff --git a/source/libs/planner/src/planSpliter.c b/source/libs/planner/src/planSpliter.c index 0d7ca3dd6d..b7f10b9df3 100644 --- a/source/libs/planner/src/planSpliter.c +++ b/source/libs/planner/src/planSpliter.c @@ -108,6 +108,7 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE pExchange->srcStartGroupId = pCxt->groupId; pExchange->srcEndGroupId = pCxt->groupId; pExchange->node.precision = pChild->precision; + pExchange->node.dynamicOp = pChild->dynamicOp; pExchange->node.pTargets = nodesCloneList(pChild->pTargets); if (NULL == pExchange->node.pTargets) { return TSDB_CODE_OUT_OF_MEMORY;