fix: add input ts order to physical plan join node
This commit is contained in:
parent
b941513479
commit
3469117732
|
@ -83,8 +83,6 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
|
||||||
} else if (pJoinNode->inputTsOrder == ORDER_DESC) {
|
} else if (pJoinNode->inputTsOrder == ORDER_DESC) {
|
||||||
pInfo->inputTsOrder = TSDB_ORDER_DESC;
|
pInfo->inputTsOrder = TSDB_ORDER_DESC;
|
||||||
}
|
}
|
||||||
//TODO: remove this when JoinNode inputTsOrder is ready
|
|
||||||
pInfo->inputTsOrder = TSDB_ORDER_ASC;
|
|
||||||
|
|
||||||
pOperator->fpSet =
|
pOperator->fpSet =
|
||||||
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
|
createOperatorFpSet(operatorDummyOpenFn, doMergeJoin, NULL, NULL, destroyMergeJoinOperator, NULL, NULL, NULL);
|
||||||
|
@ -209,15 +207,13 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
|
||||||
pJoinInfo->rightPos += 1;
|
pJoinInfo->rightPos += 1;
|
||||||
|
|
||||||
nrows += 1;
|
nrows += 1;
|
||||||
} else if (asc && leftTs < rightTs ||
|
} else if (asc && leftTs < rightTs || !asc && leftTs > rightTs) {
|
||||||
!asc && leftTs > rightTs) {
|
|
||||||
pJoinInfo->leftPos += 1;
|
pJoinInfo->leftPos += 1;
|
||||||
|
|
||||||
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
if (pJoinInfo->leftPos >= pJoinInfo->pLeft->info.rows) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
} else if (asc && leftTs > rightTs ||
|
} else if (asc && leftTs > rightTs || !asc && leftTs < rightTs) {
|
||||||
!asc && leftTs < rightTs) {
|
|
||||||
pJoinInfo->rightPos += 1;
|
pJoinInfo->rightPos += 1;
|
||||||
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
if (pJoinInfo->rightPos >= pJoinInfo->pRight->info.rows) {
|
||||||
continue;
|
continue;
|
||||||
|
|
|
@ -1712,6 +1712,7 @@ static int32_t jsonToPhysiProjectNode(const SJson* pJson, void* pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
static const char* jkJoinPhysiPlanJoinType = "JoinType";
|
static const char* jkJoinPhysiPlanJoinType = "JoinType";
|
||||||
|
static const char* jkJoinPhysiPlanInputTsOrder = "InputTsOrder";
|
||||||
static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition";
|
static const char* jkJoinPhysiPlanMergeCondition = "MergeCondition";
|
||||||
static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
|
static const char* jkJoinPhysiPlanOnConditions = "OnConditions";
|
||||||
static const char* jkJoinPhysiPlanTargets = "Targets";
|
static const char* jkJoinPhysiPlanTargets = "Targets";
|
||||||
|
@ -1723,6 +1724,9 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType);
|
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanJoinType, pNode->joinType);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = tjsonAddIntegerToObject(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition);
|
code = tjsonAddObject(pJson, jkJoinPhysiPlanMergeCondition, nodeToJson, pNode->pMergeCondition);
|
||||||
}
|
}
|
||||||
|
@ -1742,7 +1746,9 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
|
||||||
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);
|
tjsonGetNumberValue(pJson, jkJoinPhysiPlanJoinType, pNode->joinType, code);
|
||||||
;
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
tjsonGetNumberValue(pJson, jkJoinPhysiPlanInputTsOrder, pNode->inputTsOrder, code);
|
||||||
}
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions);
|
code = jsonToNodeObject(pJson, jkJoinPhysiPlanOnConditions, &pNode->pOnConditions);
|
||||||
|
|
|
@ -632,6 +632,7 @@ static int32_t createJoinPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
|
||||||
pJoin->joinType = pJoinLogicNode->joinType;
|
pJoin->joinType = pJoinLogicNode->joinType;
|
||||||
|
pJoin->inputTsOrder = pJoinLogicNode->inputTsOrder;
|
||||||
setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition,
|
setNodeSlotId(pCxt, pLeftDesc->dataBlockId, pRightDesc->dataBlockId, pJoinLogicNode->pMergeCondition,
|
||||||
&pJoin->pMergeCondition);
|
&pJoin->pMergeCondition);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
|
Loading…
Reference in New Issue