fix: extract time condition corretly when join tables together
This commit is contained in:
parent
5b8207556c
commit
2a17deec7c
|
@ -224,6 +224,7 @@ typedef struct SOperatorInfo {
|
||||||
struct SOperatorInfo** pDownstream; // downstram pointer list
|
struct SOperatorInfo** pDownstream; // downstram pointer list
|
||||||
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
int32_t numOfDownstream; // number of downstream. The value is always ONE expect for join operator
|
||||||
SOperatorFpSet fpSet;
|
SOperatorFpSet fpSet;
|
||||||
|
int16_t resultDataBlockId;
|
||||||
} SOperatorInfo;
|
} SOperatorInfo;
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
|
|
@ -3934,6 +3934,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
int32_t type = nodeType(pPhyNode);
|
int32_t type = nodeType(pPhyNode);
|
||||||
|
|
||||||
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
if (pPhyNode->pChildren == NULL || LIST_LENGTH(pPhyNode->pChildren) == 0) {
|
||||||
|
SOperatorInfo* pOperator = NULL;
|
||||||
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
if (QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN == type) {
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
@ -3951,11 +3952,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
pOperator = createTableScanOperatorInfo(pTableScanNode, pHandle, pTaskInfo);
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
return pOperator;
|
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TABLE_MERGE_SCAN == type) {
|
||||||
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
STableMergeScanPhysiNode* pTableScanNode = (STableMergeScanPhysiNode*)pPhyNode;
|
||||||
int32_t code =
|
int32_t code =
|
||||||
|
@ -3972,14 +3971,12 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
SOperatorInfo* pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
|
pOperator = createTableMergeScanOperatorInfo(pTableScanNode, pTableListInfo, pHandle, pTaskInfo);
|
||||||
|
|
||||||
STableScanInfo* pScanInfo = pOperator->info;
|
STableScanInfo* pScanInfo = pOperator->info;
|
||||||
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
pTaskInfo->cost.pRecoder = &pScanInfo->readRecorder;
|
||||||
return pOperator;
|
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == type) {
|
||||||
return createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
pOperator = createExchangeOperatorInfo(pHandle->pMsgCb->clientRpc, (SExchangePhysiNode*)pPhyNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN == type) {
|
||||||
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
STableScanPhysiNode* pTableScanNode = (STableScanPhysiNode*)pPhyNode;
|
||||||
if (pHandle->vnode) {
|
if (pHandle->vnode) {
|
||||||
|
@ -4001,12 +3998,10 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
|
pTaskInfo->schemaInfo.qsw = extractQueriedColumnSchema(&pTableScanNode->scan);
|
||||||
SOperatorInfo* pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
|
pOperator = createStreamScanOperatorInfo(pHandle, pTableScanNode, pTagCond, pTaskInfo);
|
||||||
return pOperator;
|
|
||||||
|
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_SYSTABLE_SCAN == type) {
|
||||||
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
SSystemTableScanPhysiNode* pSysScanPhyNode = (SSystemTableScanPhysiNode*)pPhyNode;
|
||||||
return createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
|
pOperator = createSysTableScanOperatorInfo(pHandle, pSysScanPhyNode, pUser, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_TAG_SCAN == type) {
|
||||||
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
STagScanPhysiNode* pScanPhyNode = (STagScanPhysiNode*)pPhyNode;
|
||||||
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
|
int32_t code = getTableList(pHandle->meta, pHandle->vnode, pScanPhyNode, pTagCond, pTagIndexCond, pTableListInfo);
|
||||||
|
@ -4015,7 +4010,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
|
pOperator = createTagScanOperatorInfo(pHandle, pScanPhyNode, pTableListInfo, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_BLOCK_DIST_SCAN == type) {
|
||||||
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
|
SBlockDistScanPhysiNode* pBlockNode = (SBlockDistScanPhysiNode*)pPhyNode;
|
||||||
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
pTableListInfo->pTableList = taosArrayInit(4, sizeof(STableKeyInfo));
|
||||||
|
@ -4041,7 +4036,7 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
|
tsdbReaderOpen(pHandle->vnode, &cond, pTableListInfo->pTableList, &pReader, "");
|
||||||
cleanupQueryTableDataCond(&cond);
|
cleanupQueryTableDataCond(&cond);
|
||||||
|
|
||||||
return createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
pOperator = createDataBlockInfoScanOperator(pReader, pHandle, cond.suid, pBlockNode, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_LAST_ROW_SCAN == type) {
|
||||||
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
|
SLastRowScanPhysiNode* pScanNode = (SLastRowScanPhysiNode*)pPhyNode;
|
||||||
|
|
||||||
|
@ -4058,12 +4053,14 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
return createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
|
pOperator = createLastrowScanOperator(pScanNode, pHandle, pTaskInfo);
|
||||||
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
|
} else if (QUERY_NODE_PHYSICAL_PLAN_PROJECT == type) {
|
||||||
return createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
|
pOperator = createProjectOperatorInfo(NULL, (SProjectPhysiNode*)pPhyNode, pTaskInfo);
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
pOperator->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
|
||||||
|
return pOperator;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t num = 0;
|
int32_t num = 0;
|
||||||
|
@ -4075,6 +4072,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
|
ops[i] = createOperatorTree(pChildNode, pTaskInfo, pHandle, pTableListInfo, pTagCond, pTagIndexCond, pUser);
|
||||||
if (ops[i] == NULL) {
|
if (ops[i] == NULL) {
|
||||||
return NULL;
|
return NULL;
|
||||||
|
} else {
|
||||||
|
ops[i]->resultDataBlockId = pChildNode->pOutputDataBlockDesc->dataBlockId;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4217,8 +4216,9 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
|
||||||
} else {
|
} else {
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosMemoryFree(ops);
|
taosMemoryFree(ops);
|
||||||
|
|
||||||
|
pOptr->resultDataBlockId = pPhyNode->pOutputDataBlockDesc->dataBlockId;
|
||||||
return pOptr;
|
return pOptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -26,7 +26,33 @@
|
||||||
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
static void setJoinColumnInfo(SColumnInfo* pColumn, const SColumnNode* pColumnNode);
|
||||||
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
|
static SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator);
|
||||||
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
|
static void destroyMergeJoinOperator(void* param, int32_t numOfOutput);
|
||||||
static void extractTimeCondition(SJoinOperatorInfo* Info, SLogicConditionNode* pLogicConditionNode);
|
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||||
|
SSortMergeJoinPhysiNode* pJoinNode);
|
||||||
|
|
||||||
|
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SOperatorInfo** pDownstream, int32_t numOfDownstream, SSortMergeJoinPhysiNode* pJoinNode) {
|
||||||
|
SNode* pMergeCondition = pJoinNode->pMergeCondition;
|
||||||
|
if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
|
||||||
|
SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
|
||||||
|
SColumnNode* col1 = (SColumnNode*)pNode->pLeft;
|
||||||
|
SColumnNode* col2 = (SColumnNode*)pNode->pRight;
|
||||||
|
SColumnNode* leftTsCol = NULL;
|
||||||
|
SColumnNode* rightTsCol = NULL;
|
||||||
|
if (col1->dataBlockId == pDownstream[0]->resultDataBlockId) {
|
||||||
|
ASSERT(col2->dataBlockId == pDownstream[1]->resultDataBlockId);
|
||||||
|
leftTsCol = col1;
|
||||||
|
rightTsCol = col2;
|
||||||
|
} else {
|
||||||
|
ASSERT(col1->dataBlockId == pDownstream[1]->resultDataBlockId);
|
||||||
|
ASSERT(col2->dataBlockId == pDownstream[0]->resultDataBlockId);
|
||||||
|
leftTsCol = col2;
|
||||||
|
rightTsCol = col1;
|
||||||
|
}
|
||||||
|
setJoinColumnInfo(&pInfo->leftCol, leftTsCol);
|
||||||
|
setJoinColumnInfo(&pInfo->rightCol, rightTsCol);
|
||||||
|
} else {
|
||||||
|
ASSERT(false);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t numOfDownstream,
|
||||||
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
|
SSortMergeJoinPhysiNode* pJoinNode, SExecTaskInfo* pTaskInfo) {
|
||||||
|
@ -53,14 +79,7 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
pOperator->info = pInfo;
|
pOperator->info = pInfo;
|
||||||
pOperator->pTaskInfo = pTaskInfo;
|
pOperator->pTaskInfo = pTaskInfo;
|
||||||
|
|
||||||
SNode* pMergeCondition = pJoinNode->pMergeCondition;
|
extractTimeCondition(pInfo, pDownstream, numOfDownstream, pJoinNode);
|
||||||
if (nodeType(pMergeCondition) == QUERY_NODE_OPERATOR) {
|
|
||||||
SOperatorNode* pNode = (SOperatorNode*)pMergeCondition;
|
|
||||||
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pNode->pLeft);
|
|
||||||
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pNode->pRight);
|
|
||||||
} else {
|
|
||||||
ASSERT(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
|
if (pJoinNode->pOnConditions != NULL && pJoinNode->node.pConditions != NULL) {
|
||||||
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
|
pInfo->pCondAfterMerge = nodesMakeNode(QUERY_NODE_LOGIC_CONDITION);
|
||||||
|
@ -367,17 +386,3 @@ SSDataBlock* doMergeJoin(struct SOperatorInfo* pOperator) {
|
||||||
}
|
}
|
||||||
return (pRes->info.rows > 0) ? pRes : NULL;
|
return (pRes->info.rows > 0) ? pRes : NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void extractTimeCondition(SJoinOperatorInfo* pInfo, SLogicConditionNode* pLogicConditionNode) {
|
|
||||||
int32_t len = LIST_LENGTH(pLogicConditionNode->pParameterList);
|
|
||||||
|
|
||||||
for (int32_t i = 0; i < len; ++i) {
|
|
||||||
SNode* pNode = nodesListGetNode(pLogicConditionNode->pParameterList, i);
|
|
||||||
if (nodeType(pNode) == QUERY_NODE_OPERATOR) {
|
|
||||||
SOperatorNode* pn1 = (SOperatorNode*)pNode;
|
|
||||||
setJoinColumnInfo(&pInfo->leftCol, (SColumnNode*)pn1->pLeft);
|
|
||||||
setJoinColumnInfo(&pInfo->rightCol, (SColumnNode*)pn1->pRight);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
Loading…
Reference in New Issue