From 8196b6ee9ac79a74587d504b2e816386b6eb233c Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Thu, 10 Mar 2022 18:07:07 +0800 Subject: [PATCH] [td-13039] refactor. --- source/libs/executor/inc/executorimpl.h | 2 +- source/libs/executor/src/executorimpl.c | 28 +++++++++++++++++-------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 0e0a5f9333..e95457b91e 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -632,7 +632,7 @@ typedef struct SOrderOperatorInfo { uint64_t totalElapsed; // total elapsed time } SOrderOperatorInfo; -SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo); +SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput, int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo); SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv); diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 30a5bd3f80..bc257f797a 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -5181,10 +5181,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) { #endif } -// TODO remove it -static SSDataBlock* createResultDataBlock(const SArray* pExprInfo); - -SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) { +// TODO handle the error +SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) { SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo)); SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo)); @@ -5196,8 +5194,19 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArra } size_t numOfSources = LIST_LENGTH(pSources); + pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode)); + if (pInfo->pSources == NULL) { + tfree(pInfo); + tfree(pOperator); + terrno = TSDB_CODE_QRY_OUT_OF_MEMORY; + return NULL; + } + + for(int32_t i = 0; i < numOfSources; ++i) { + SNodeListNode* pNode = nodesListGetNode((SNodeList*) pSources, i); + taosArrayPush(pInfo->pSources, pNode); + } -// pInfo->pSources = taosArrayDup(pSources); pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo)); if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) { tfree(pInfo); @@ -5217,8 +5226,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArra taosArrayPush(pInfo->pSourceDataInfo, &dataInfo); } - size_t size = taosArrayGetSize(pExprInfo); - pInfo->pResult = createResultDataBlock(pExprInfo); + size_t size = pBlock->info.numOfCols; + pInfo->pResult = pBlock; pInfo->seqLoadData = true; tsem_init(&pInfo->ready, 0, 0); @@ -8105,8 +8114,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count, pScanPhyNode->reverse, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) { - SExchangePhysiNode* pEx = (SExchangePhysiNode*)pPhyNode; - return createExchangeOperatorInfo(pEx->pSrcEndPoints, NULL, pTaskInfo); + SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode; + SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc); + return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) { SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table. STableGroupInfo groupInfo = {0};