[td-13039] refactor.

This commit is contained in:
Haojun Liao 2022-03-10 18:07:07 +08:00
parent 8ee10d50da
commit 8196b6ee9a
2 changed files with 20 additions and 10 deletions

View File

@ -632,7 +632,7 @@ typedef struct SOrderOperatorInfo {
uint64_t totalElapsed; // total elapsed time
} SOrderOperatorInfo;
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableScanOperatorInfo(void* pTsdbReadHandle, int32_t order, int32_t numOfOutput,
int32_t repeatTime, int32_t reverseTime, SExecTaskInfo* pTaskInfo);
SOperatorInfo* createTableSeqScanOperatorInfo(void* pTsdbReadHandle, STaskRuntimeEnv* pRuntimeEnv);

View File

@ -5181,10 +5181,8 @@ static SSDataBlock* doLoadRemoteData(void* param, bool* newgroup) {
#endif
}
// TODO remove it
static SSDataBlock* createResultDataBlock(const SArray* pExprInfo);
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArray* pExprInfo, SExecTaskInfo* pTaskInfo) {
// TODO handle the error
SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo) {
SExchangeInfo* pInfo = calloc(1, sizeof(SExchangeInfo));
SOperatorInfo* pOperator = calloc(1, sizeof(SOperatorInfo));
@ -5196,8 +5194,19 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArra
}
size_t numOfSources = LIST_LENGTH(pSources);
pInfo->pSources = taosArrayInit(numOfSources, sizeof(SDownstreamSourceNode));
if (pInfo->pSources == NULL) {
tfree(pInfo);
tfree(pOperator);
terrno = TSDB_CODE_QRY_OUT_OF_MEMORY;
return NULL;
}
for(int32_t i = 0; i < numOfSources; ++i) {
SNodeListNode* pNode = nodesListGetNode((SNodeList*) pSources, i);
taosArrayPush(pInfo->pSources, pNode);
}
// pInfo->pSources = taosArrayDup(pSources);
pInfo->pSourceDataInfo = taosArrayInit(numOfSources, sizeof(SSourceDataInfo));
if (pInfo->pSourceDataInfo == NULL || pInfo->pSources == NULL) {
tfree(pInfo);
@ -5217,8 +5226,8 @@ SOperatorInfo* createExchangeOperatorInfo(const SNodeList* pSources, const SArra
taosArrayPush(pInfo->pSourceDataInfo, &dataInfo);
}
size_t size = taosArrayGetSize(pExprInfo);
pInfo->pResult = createResultDataBlock(pExprInfo);
size_t size = pBlock->info.numOfCols;
pInfo->pResult = pBlock;
pInfo->seqLoadData = true;
tsem_init(&pInfo->ready, 0, 0);
@ -8105,8 +8114,9 @@ SOperatorInfo* doCreateOperatorTreeNode(SPhysiNode* pPhyNode, SExecTaskInfo* pTa
return createTableScanOperatorInfo(pDataReader, pScanPhyNode->order, numOfCols, pScanPhyNode->count,
pScanPhyNode->reverse, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pPhyNode)) {
SExchangePhysiNode* pEx = (SExchangePhysiNode*)pPhyNode;
return createExchangeOperatorInfo(pEx->pSrcEndPoints, NULL, pTaskInfo);
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pPhyNode;
SSDataBlock* pResBlock = createOutputBuf_rv1(pExchange->node.pOutputDataBlockDesc);
return createExchangeOperatorInfo(pExchange->pSrcEndPoints, pResBlock, pTaskInfo);
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == nodeType(pPhyNode)) {
SScanPhysiNode* pScanPhyNode = (SScanPhysiNode*)pPhyNode; // simple child table.
STableGroupInfo groupInfo = {0};