diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index d587e201ef..f5a1f34366 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -224,6 +224,7 @@ typedef struct SOperatorInfo { struct SOperatorInfo** pDownstream; // downstram pointer list int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator SOperatorFpSet fpSet; + int16_t resultDataBlockId; } SOperatorInfo; typedef enum { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 9dd028eeb7..2dbe4c184c 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -3934,6 +3934,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo int32_t type = nodeType(pPhyNode); if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) { + SOperatorInfo* pOperator = NULL; if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; @@ -3951,11 +3952,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); + pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - return pOperator; - } else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) { STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode; int32_t code = @@ -3972,14 +3971,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); + pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo); STableScanInfo* pScanInfo = pOperator->info; pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder; - return pOperator; - } else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) { - return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); + pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) { STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode; if (pHandle->vnode) { @@ -4001,12 +3998,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo #endif pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan); - SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); - return pOperator; - + pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) { SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode; - return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); + pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) { STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode; int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo); @@ -4015,7 +4010,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); + pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) { SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode; pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo)); @@ -4041,7 +4036,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, ""); cleanupQueryTableDataCond(&cond); - return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); + pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) { SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode; @@ -4058,12 +4053,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo return NULL; } - return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); + pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo); } else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) { - return createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); + pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo); } else { ASSERT(0); } + pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; + return pOperator; } int32_t num = 0; @@ -4075,6 +4072,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser); if (ops[i] == NULL) { return NULL; + } else { + ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId; } } @@ -4217,8 +4216,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo } else { ASSERT(0); } - taosMemoryFree(ops); + + pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId; return pOptr; } diff --git a/source/libs/executor/src/joinoperator.c b/source/libs/executor/src/joinoperator.c index 4134ce5dbf..659940c696 100644 --- a/source/libs/executor/src/joinoperator.c +++ b/source/libs/executor/src/joinoperator.c @@ -26,7 +26,33 @@ static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode); static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator); static void destroyMergeJoinOperator(void* param, int32_t numOfOutput); -static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode); +static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, + SSortMergeJoinPhysiNode* pJoinNode); + +static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode) { + SNode* pMergeCondition = pJoinNode->pMergeCondition; + if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { + SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; + SColumnNode* col1 = (SColumnNode*)pNode->pLeft; + SColumnNode* col2 = (SColumnNode*)pNode->pRight; + SColumnNode* leftTsCol = NULL; + SColumnNode* rightTsCol = NULL; + if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) { + ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId); + leftTsCol = col1; + rightTsCol = col2; + } else { + ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId); + ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId); + leftTsCol = col2; + rightTsCol = col1; + } + setJoinColumnInfo(&pInfo->leftCol, leftTsCol); + setJoinColumnInfo(&pInfo->rightCol, rightTsCol); + } else { + ASSERT(false); + } +} SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) { @@ -53,14 +79,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t pOperator->info = pInfo; pOperator->pTaskInfo = pTaskInfo; - SNode* pMergeCondition = pJoinNode->pMergeCondition; - if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) { - SOperatorNode* pNode = (SOperatorNode*)pMergeCondition; - setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft); - setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight); - } else { - ASSERT(false); - } + extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode); if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) { pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION); @@ -367,17 +386,3 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) { } return (pRes->info.rows > 0) ? pRes : NULL; } - -static void extractTimeCondition(SJoinOperatorInfo* pInfo, SLogicConditionNode* pLogicConditionNode) { - int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList); - - for (int32_t i = 0; i < len; ++i) { - SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i); - if (nodeType(pNode) == QUERY_NODE_OPERATOR) { - SOperatorNode* pn1 = (SOperatorNode*)pNode; - setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pn1->pLeft); - setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pn1->pRight); - break; - } - } -}