Merge pull request #17018 from taosdata/fix/3.0_bugfix_wxy
fix: subplans under set operator use different group ids
This commit is contained in:
commit
778aa44fc2
|
@ -165,7 +165,8 @@ typedef struct SVnodeModifyLogicNode {
|
|||
|
||||
typedef struct SExchangeLogicNode {
|
||||
SLogicNode node;
|
||||
int32_t srcGroupId;
|
||||
int32_t srcStartGroupId;
|
||||
int32_t srcEndGroupId;
|
||||
} SExchangeLogicNode;
|
||||
|
||||
typedef struct SMergeLogicNode {
|
||||
|
@ -399,7 +400,10 @@ typedef struct SDownstreamSourceNode {
|
|||
|
||||
typedef struct SExchangePhysiNode {
|
||||
SPhysiNode node;
|
||||
int32_t srcGroupId; // group id of datasource suplans
|
||||
// for set operators, there will be multiple execution groups under one exchange, and the ids of these execution
|
||||
// groups are consecutive
|
||||
int32_t srcStartGroupId;
|
||||
int32_t srcEndGroupId;
|
||||
bool singleChannel;
|
||||
SNodeList* pSrcEndPoints; // element is SDownstreamSource, scheduler fill by calling qSetSuplanExecutionNode
|
||||
} SExchangePhysiNode;
|
||||
|
|
|
@ -764,9 +764,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
|
||||
SExchangePhysiNode *pExchNode = (SExchangePhysiNode *)pNode;
|
||||
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcGroupId, sizeof(pExchNode->srcGroupId));
|
||||
SExplainGroup *group = taosHashGet(ctx->groupHash, &pExchNode->srcStartGroupId, sizeof(pExchNode->srcStartGroupId));
|
||||
if (NULL == group) {
|
||||
qError("exchange src group %d not in groupHash", pExchNode->srcGroupId);
|
||||
qError("exchange src group %d not in groupHash", pExchNode->srcStartGroupId);
|
||||
QRY_ERR_RET(TSDB_CODE_QRY_APP_ERROR);
|
||||
}
|
||||
|
||||
|
@ -801,7 +801,7 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
|
|||
}
|
||||
}
|
||||
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcGroupId, level + 1));
|
||||
QRY_ERR_RET(qExplainAppendGroupResRows(ctx, pExchNode->srcStartGroupId, level + 1));
|
||||
break;
|
||||
}
|
||||
case QUERY_NODE_PHYSICAL_PLAN_SORT: {
|
||||
|
|
|
@ -429,7 +429,8 @@ static int32_t logicVnodeModifCopy(const SVnodeModifyLogicNode* pSrc, SVnodeModi
|
|||
|
||||
static int32_t logicExchangeCopy(const SExchangeLogicNode* pSrc, SExchangeLogicNode* pDst) {
|
||||
COPY_BASE_OBJECT_FIELD(node, logicNodeCopy);
|
||||
COPY_SCALAR_FIELD(srcGroupId);
|
||||
COPY_SCALAR_FIELD(srcStartGroupId);
|
||||
COPY_SCALAR_FIELD(srcEndGroupId);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
|
|
@ -726,14 +726,18 @@ static int32_t jsonToLogicVnodeModifyNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkExchangeLogicPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkExchangeLogicPlanSrcStartGroupId = "SrcStartGroupId";
|
||||
static const char* jkExchangeLogicPlanSrcEndGroupId = "SrcEndGroupId";
|
||||
|
||||
static int32_t logicExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
const SExchangeLogicNode* pNode = (const SExchangeLogicNode*)pObj;
|
||||
|
||||
int32_t code = logicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcGroupId, pNode->srcGroupId);
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcStartGroupId, pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangeLogicPlanSrcEndGroupId, pNode->srcEndGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -744,7 +748,10 @@ static int32_t jsonToLogicExchangeNode(const SJson* pJson, void* pObj) {
|
|||
|
||||
int32_t code = jsonToLogicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcGroupId, &pNode->srcGroupId);
|
||||
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcStartGroupId, &pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangeLogicPlanSrcEndGroupId, &pNode->srcEndGroupId);
|
||||
}
|
||||
|
||||
return code;
|
||||
|
@ -1837,7 +1844,8 @@ static int32_t jsonToPhysiAggNode(const SJson* pJson, void* pObj) {
|
|||
return code;
|
||||
}
|
||||
|
||||
static const char* jkExchangePhysiPlanSrcGroupId = "SrcGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcStartGroupId = "SrcStartGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcEndGroupId = "SrcEndGroupId";
|
||||
static const char* jkExchangePhysiPlanSrcEndPoints = "SrcEndPoints";
|
||||
|
||||
static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
||||
|
@ -1845,7 +1853,10 @@ static int32_t physiExchangeNodeToJson(const void* pObj, SJson* pJson) {
|
|||
|
||||
int32_t code = physicPlanNodeToJson(pObj, pJson);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcGroupId, pNode->srcGroupId);
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcStartGroupId, pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonAddIntegerToObject(pJson, jkExchangePhysiPlanSrcEndGroupId, pNode->srcEndGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodeListToJson(pJson, jkExchangePhysiPlanSrcEndPoints, pNode->pSrcEndPoints);
|
||||
|
@ -1859,7 +1870,10 @@ static int32_t jsonToPhysiExchangeNode(const SJson* pJson, void* pObj) {
|
|||
|
||||
int32_t code = jsonToPhysicPlanNode(pJson, pObj);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcGroupId, &pNode->srcGroupId);
|
||||
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcStartGroupId, &pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tjsonGetIntValue(pJson, jkExchangePhysiPlanSrcEndGroupId, &pNode->srcEndGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = jsonToNodeList(pJson, jkExchangePhysiPlanSrcEndPoints, &pNode->pSrcEndPoints);
|
||||
|
|
|
@ -2380,7 +2380,8 @@ static int32_t msgToPhysiAggNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
|
||||
enum {
|
||||
PHY_EXCHANGE_CODE_BASE_NODE = 1,
|
||||
PHY_EXCHANGE_CODE_SRC_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SRC_START_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SRC_END_GROUP_ID,
|
||||
PHY_EXCHANGE_CODE_SINGLE_CHANNEL,
|
||||
PHY_EXCHANGE_CODE_SRC_ENDPOINTS
|
||||
};
|
||||
|
@ -2390,7 +2391,10 @@ static int32_t physiExchangeNodeToMsg(const void* pObj, STlvEncoder* pEncoder) {
|
|||
|
||||
int32_t code = tlvEncodeObj(pEncoder, PHY_EXCHANGE_CODE_BASE_NODE, physiNodeToMsg, &pNode->node);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_GROUP_ID, pNode->srcGroupId);
|
||||
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_START_GROUP_ID, pNode->srcStartGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeI32(pEncoder, PHY_EXCHANGE_CODE_SRC_END_GROUP_ID, pNode->srcEndGroupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = tlvEncodeBool(pEncoder, PHY_EXCHANGE_CODE_SINGLE_CHANNEL, pNode->singleChannel);
|
||||
|
@ -2412,8 +2416,11 @@ static int32_t msgToPhysiExchangeNode(STlvDecoder* pDecoder, void* pObj) {
|
|||
case PHY_EXCHANGE_CODE_BASE_NODE:
|
||||
code = tlvDecodeObjFromTlv(pTlv, msgToPhysiNode, &pNode->node);
|
||||
break;
|
||||
case PHY_EXCHANGE_CODE_SRC_GROUP_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->srcGroupId);
|
||||
case PHY_EXCHANGE_CODE_SRC_START_GROUP_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->srcStartGroupId);
|
||||
break;
|
||||
case PHY_EXCHANGE_CODE_SRC_END_GROUP_ID:
|
||||
code = tlvDecodeI32(pTlv, &pNode->srcEndGroupId);
|
||||
break;
|
||||
case PHY_EXCHANGE_CODE_SINGLE_CHANNEL:
|
||||
code = tlvDecodeBool(pTlv, &pNode->singleChannel);
|
||||
|
|
|
@ -1046,7 +1046,8 @@ static int32_t doCreateExchangePhysiNode(SPhysiPlanContext* pCxt, SExchangeLogic
|
|||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
pExchange->srcGroupId = pExchangeLogicNode->srcGroupId;
|
||||
pExchange->srcStartGroupId = pExchangeLogicNode->srcStartGroupId;
|
||||
pExchange->srcEndGroupId = pExchangeLogicNode->srcEndGroupId;
|
||||
*pPhyNode = (SPhysiNode*)pExchange;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -1425,7 +1426,8 @@ static int32_t createExchangePhysiNodeByMerge(SMergePhysiNode* pMerge) {
|
|||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pMerge->srcGroupId;
|
||||
pExchange->srcStartGroupId = pMerge->srcGroupId;
|
||||
pExchange->srcEndGroupId = pMerge->srcGroupId;
|
||||
pExchange->singleChannel = true;
|
||||
pExchange->node.pParent = (SPhysiNode*)pMerge;
|
||||
pExchange->node.pOutputDataBlockDesc = (SDataBlockDescNode*)nodesCloneNode((SNode*)pMerge->node.pOutputDataBlockDesc);
|
||||
|
|
|
@ -84,7 +84,8 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
|
|||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
pExchange->srcStartGroupId = pCxt->groupId;
|
||||
pExchange->srcEndGroupId = pCxt->groupId;
|
||||
pExchange->node.precision = pChild->precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pChild->pTargets);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
|
@ -112,7 +113,8 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
|
|||
|
||||
static bool splIsChildSubplan(SLogicNode* pLogicNode, int32_t groupId) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pLogicNode)) {
|
||||
return ((SExchangeLogicNode*)pLogicNode)->srcGroupId == groupId;
|
||||
return groupId >= ((SExchangeLogicNode*)pLogicNode)->srcStartGroupId &&
|
||||
groupId <= ((SExchangeLogicNode*)pLogicNode)->srcEndGroupId;
|
||||
}
|
||||
|
||||
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pLogicNode)) {
|
||||
|
@ -1184,6 +1186,7 @@ static int32_t unionSplitSubplan(SSplitContext* pCxt, SLogicSubplan* pUnionSubpl
|
|||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
nodesDestroyList(pSubplanChildren);
|
||||
|
@ -1207,12 +1210,14 @@ static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan,
|
|||
return false;
|
||||
}
|
||||
|
||||
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
|
||||
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
|
||||
SProjectLogicNode* pProject) {
|
||||
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
pExchange->srcStartGroupId = startGroupId;
|
||||
pExchange->srcEndGroupId = pCxt->groupId - 1;
|
||||
pExchange->node.precision = pProject->node.precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pProject->node.pTargets);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
|
@ -1246,11 +1251,11 @@ static int32_t unionAllSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t startGroupId = pCxt->groupId;
|
||||
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pProject);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = unAllSplCreateExchangeNode(pCxt, info.pSubplan, info.pProject);
|
||||
code = unAllSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pProject);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
return code;
|
||||
}
|
||||
|
@ -1260,12 +1265,14 @@ typedef struct SUnionDistinctSplitInfo {
|
|||
SLogicSubplan* pSubplan;
|
||||
} SUnionDistinctSplitInfo;
|
||||
|
||||
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
|
||||
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, int32_t startGroupId, SLogicSubplan* pSubplan,
|
||||
SAggLogicNode* pAgg) {
|
||||
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
|
||||
if (NULL == pExchange) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pExchange->srcGroupId = pCxt->groupId;
|
||||
pExchange->srcStartGroupId = startGroupId;
|
||||
pExchange->srcEndGroupId = pCxt->groupId - 1;
|
||||
pExchange->node.precision = pAgg->node.precision;
|
||||
pExchange->node.pTargets = nodesCloneList(pAgg->pGroupKeys);
|
||||
if (NULL == pExchange->node.pTargets) {
|
||||
|
@ -1293,11 +1300,11 @@ static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan)
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t startGroupId = pCxt->groupId;
|
||||
int32_t code = unionSplitSubplan(pCxt, info.pSubplan, (SLogicNode*)info.pAgg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = unDistSplCreateExchangeNode(pCxt, info.pSubplan, info.pAgg);
|
||||
code = unDistSplCreateExchangeNode(pCxt, startGroupId, info.pSubplan, info.pAgg);
|
||||
}
|
||||
++(pCxt->groupId);
|
||||
pCxt->split = true;
|
||||
return code;
|
||||
}
|
||||
|
@ -1430,7 +1437,7 @@ static const SSplitRule splitRuleSet[] = {
|
|||
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
|
||||
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
|
||||
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
|
||||
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
|
||||
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}, // not used yet
|
||||
{.pName = "InsertSelectSplit", .splitFunc = insertSelectSplit}
|
||||
};
|
||||
// clang-format on
|
||||
|
|
|
@ -63,7 +63,7 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo
|
|||
static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) {
|
||||
if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) {
|
||||
SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode;
|
||||
if (pExchange->srcGroupId == groupId) {
|
||||
if (groupId >= pExchange->srcStartGroupId && groupId <= pExchange->srcEndGroupId) {
|
||||
return nodesListMakeStrictAppend(&pExchange->pSrcEndPoints, nodesCloneNode((SNode*)pSource));
|
||||
}
|
||||
} else if (QUERY_NODE_PHYSICAL_PLAN_MERGE == nodeType(pNode)) {
|
||||
|
|
Loading…
Reference in New Issue