fix: add merge join

This commit is contained in:
dapan1121 2024-10-10 11:32:37 +08:00
parent ff5e7556c4
commit 01a4256d0a
1 changed files with 64 additions and 82 deletions

View File

@ -1106,8 +1106,9 @@ void qptMakeNonRealTableNode(SNode** ppNode) {
}
SNode* qptMakeRandNode(SNode** ppNode) {
nodesMakeNode((ENodeType)taosRand(), ppNode);
return *ppNode;
SNode* pNode = NULL;
nodesMakeNode((ENodeType)taosRand(), ppNode ? ppNode : &pNode);
return ppNode ? *ppNode : pNode;
}
SNode* qptMakeExprNode(SNode** ppNode) {
@ -1179,7 +1180,7 @@ SNode* qptMakeConditionNode(bool onlyTag) {
}
SNode* qptMakeDataBlockDescNode() {
if (!qptCtx.param.correctExpected && QPT_LOW_PROB()) {
if (QPT_NCORRECT_LOW_PROB()) {
return NULL;
}
@ -1192,6 +1193,41 @@ SNode* qptMakeDataBlockDescNode() {
return (SNode*)pDesc;
}
SNode* qptMakeSlotDescNode(const char* pName, const SNode* pNode, int16_t slotId, bool output, bool reserve) {
SSlotDescNode* pSlot = NULL;
if (QPT_NCORRECT_LOW_PROB) {
return qptMakeRandNode((SNode**)&pSlot);
}
assert(0 == nodesMakeNode(QUERY_NODE_SLOT_DESC, (SNode**)&pSlot));
QPT_RAND_BOOL_V ? (pSlot->name[0] = 0) : snprintf(pSlot->name, sizeof(pSlot->name), "%s", pName);
pSlot->slotId = QPT_CORRECT_HIGH_PROB() ? slotId : taosRand();
if (QPT_CORRECT_HIGH_PROB()) {
pSlot->dataType = ((SExprNode*)pNode)->resType;
} else {
qptGetRandValue(&pSlot->dataType.type, &pSlot->dataType.bytes, NULL);
}
pSlot->reserve = reserve;
pSlot->output = output;
return (SNode*)pSlot;
}
void qptMakeTargetNode(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
if (QPT_NCORRECT_LOW_PROB) {
return qptMakeRandNode(pOutput);
}
STargetNode* pTarget = NULL;
assert(0 == nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTarget));
pTarget->dataBlockId = QPT_CORRECT_HIGH_PROB() ? dataBlockId : taosRand();
pTarget->slotId = QPT_CORRECT_HIGH_PROB() ? slotId : taosRand();
pTarget->pExpr = QPT_CORRECT_HIGH_PROB() ? pNode : qptMakeRandNode(NULL);
*pOutput = (SNode*)pTarget;
}
SPhysiNode* qptCreatePhysiNode(int32_t nodeType) {
SPhysiNode* pPhysiNode = NULL;
@ -1274,33 +1310,6 @@ void qptCreateTableScanPseudoCols( SScanPhysiNode* pScanPhysiNode) {
qptCreateTableScanColsImpl(pScanPhysiNode, &pScanPhysiNode->pScanPseudoCols, qptCtx.param.tbl.tagNum, qptCtx.param.tbl.pTag);
}
SNode* qptMakeSlotDescNode(const char* pName, const SNode* pNode, int16_t slotId, bool output, bool reserve) {
SSlotDescNode* pSlot = NULL;
assert(0 == nodesMakeNode(QUERY_NODE_SLOT_DESC, (SNode**)&pSlot));
QPT_RAND_BOOL_V ? (pSlot->name[0] = 0) : snprintf(pSlot->name, sizeof(pSlot->name), "%s", pName);
pSlot->slotId = qptCtx.param.correctExpected ? slotId : taosRand();
if (qptCtx.param.correctExpected) {
pSlot->dataType = ((SExprNode*)pNode)->resType;
} else {
qptGetRandValue(&pSlot->dataType.type, &pSlot->dataType.bytes, NULL);
}
pSlot->reserve = reserve;
pSlot->output = output;
return (SNode*)pSlot;
}
void qptMakeTargetNode(SNode* pNode, int16_t dataBlockId, int16_t slotId, SNode** pOutput) {
STargetNode* pTarget = NULL;
assert(0 == nodesMakeNode(QUERY_NODE_TARGET, (SNode**)&pTarget));
pTarget->dataBlockId = dataBlockId;
pTarget->slotId = slotId;
pTarget->pExpr = pNode;
*pOutput = (SNode*)pTarget;
}
void qptAddDataBlockSlots(SNodeList* pList, SDataBlockDescNode* pDataBlockDesc) {
if (NULL == pDataBlockDesc) {
@ -1392,6 +1401,26 @@ void qptMakeExprList(SNodeList** ppList) {
}
}
void qptMakeColumnList(SNodeList** ppList) {
int32_t colNum = taosRand() % QPT_MAX_COLUMN_NUM + (QPT_CORRECT_HIGH_PROB() ? 1 : 0);
qptResetMakeNodeCtx(qptCtx.buildCtx.pChild ? qptCtx.buildCtx.pChild->pOutputDataBlockDesc : NULL, false);
for (int32_t i = 0; i < colNum; ++i) {
SNode* pNode = NULL;
qptMakeColumnNode(&pNode);
qptNodesListMakeStrictAppend(ppList, pNode);
}
}
void qptMakeTargetList(int16_t datablockId, SNodeList** ppList) {
int32_t tarNum = taosRand() % QPT_MAX_COLUMN_NUM + (QPT_CORRECT_HIGH_PROB() ? 1 : 0);
for (int32_t i = 0; i < tarNum; ++i) {
SNode* pNode = NULL, *pExpr = NULL;
qptMakeColumnNode(&pExpr);
qptMakeTargetNode(pExpr, datablockId, i, &pNode);
qptNodesListMakeStrictAppend(ppList, pNode);
}
}
SNode* qptCreateProjectPhysiNode(int32_t nodeType) {
SPhysiNode* pPhysiNode = qptCreatePhysiNode(nodeType);
@ -1432,61 +1461,14 @@ SNode* qptCreateSortMergeJoinPhysiNode(int32_t nodeType) {
qptMakeExprNode(&pJoin->pColEqCond);
qptMakeExprNode(&pJoin->pColOnCond);
qptMakeExprNode(&pJoin->pFullOnCond);
qptMakeTargetNode(SNode * pNode, int16_t dataBlockId, int16_t slotId, SNode * * pOutput)
qptMakeTargetList(pPhysiNode->pOutputDataBlockDesc ? pPhysiNode->pOutputDataBlockDesc->dataBlockId, &pJoin->pTargets);
for (int32_t i = 0; i < 2; i++) {
pJoin->inputStat[i].inputRowNum = taosRand();
pJoin->inputStat[i].inputRowSize = taosRand();
}
pJoin->seqWinGroup = QPT_RAND_BOOL_V;
pJoin->grpJoin = QPT_RAND_BOOL_V;
pJoin->leftPrimSlotId = JT_PRIM_TS_SLOT_ID;
pJoin->rightPrimSlotId = JT_PRIM_TS_SLOT_ID;
pJoin->node.inputTsOrder = param->asc ? ORDER_ASC : ORDER_DESC;
if (JOIN_STYPE_WIN == pJoin->subType) {
SWindowOffsetNode* pOffset = NULL;
code = nodesMakeNode(QUERY_NODE_WINDOW_OFFSET, (SNode**)&pOffset);
assert(pOffset);
SValueNode* pStart = NULL;
code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pStart);
assert(pStart);
SValueNode* pEnd = NULL;
code = nodesMakeNode(QUERY_NODE_VALUE, (SNode**)&pEnd);
assert(pEnd);
pStart->node.resType.type = TSDB_DATA_TYPE_BIGINT;
pStart->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pStart->datum.i = (taosRand() % 2) ? (((int32_t)-1) * (int64_t)(taosRand() % JT_MAX_WINDOW_OFFSET)) : (taosRand() % JT_MAX_WINDOW_OFFSET);
pEnd->node.resType.type = TSDB_DATA_TYPE_BIGINT;
pEnd->node.resType.bytes = tDataTypes[TSDB_DATA_TYPE_BIGINT].bytes;
pEnd->datum.i = (taosRand() % 2) ? (((int32_t)-1) * (int64_t)(taosRand() % JT_MAX_WINDOW_OFFSET)) : (taosRand() % JT_MAX_WINDOW_OFFSET);
if (pStart->datum.i > pEnd->datum.i) {
TSWAP(pStart->datum.i, pEnd->datum.i);
}
pOffset->pStartOffset = (SNode*)pStart;
pOffset->pEndOffset = (SNode*)pEnd;
pJoin->pWindowOffset = (SNode*)pOffset;
jtCtx.winStartOffset = pStart->datum.i;
jtCtx.winEndOffset = pEnd->datum.i;
}
jtCtx.grpJoin = param->grpJoin;
jtCtx.joinType = param->joinType;
jtCtx.subType = param->subType;
jtCtx.asc = param->asc;
jtCtx.jLimit = param->jLimit;
jtCtx.asofOpType = param->asofOp;
jtCtx.leftColOnly = (JOIN_TYPE_LEFT == param->joinType && JOIN_STYPE_SEMI == param->subType);
jtCtx.rightColOnly = (JOIN_TYPE_RIGHT == param->joinType && JOIN_STYPE_SEMI == param->subType);
jtCtx.inGrpId = 1;
createColCond(pJoin, param->cond);
createFilterStart(pJoin, param->filter);
createTargetSlotList(pJoin);
createColEqCondEnd(pJoin);
createColOnCondEnd(pJoin);
createFilterEnd(pJoin, param->filter);
updateColRowInfo();
createBlockDescNode(&pJoin->node.pOutputDataBlockDesc);
return (SNode*)pPhysiNode;
}