diff --git a/include/libs/nodes/plannodes.h b/include/libs/nodes/plannodes.h index 084d642292..e52c313b8d 100644 --- a/include/libs/nodes/plannodes.h +++ b/include/libs/nodes/plannodes.h @@ -105,7 +105,8 @@ typedef struct SSubLogicPlan { typedef struct SQueryLogicPlan { ENodeType type;; - SNodeList* pSubplans; + int32_t totalLevel; + SNodeList* pTopSubplans; } SQueryLogicPlan; typedef struct SSlotDescNode { @@ -221,8 +222,8 @@ typedef struct SSubplan { typedef struct SQueryPlan { ENodeType type;; uint64_t queryId; - int32_t numOfSubplans; - SNodeList* pSubplans; // SNodeListNode. The execution level of subplan, starting from 0. + int32_t numOfSubplans; + SNodeList* pSubplans; // Element is SNodeListNode. The execution level of subplan, starting from 0. } SQueryPlan; #ifdef __cplusplus diff --git a/include/libs/planner/planner.h b/include/libs/planner/planner.h index 5d6ec46d85..70e35824b6 100644 --- a/include/libs/planner/planner.h +++ b/include/libs/planner/planner.h @@ -31,14 +31,14 @@ typedef struct SPlanContext { int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNodeList); // Set datasource of this subplan, multiple calls may be made to a subplan. -// @subplan subplan to be schedule -// @groupId id of a group of datasource subplans of this @subplan -// @ep one execution location of this group of datasource subplans -void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource); +// @pSubplan subplan to be schedule +// @groupId id of a group of datasource subplans of this @pSubplan +// @pSource one execution location of this group of datasource subplans +int32_t qSetSubplanExecutionNode(SSubplan* pSubplan, int32_t groupId, SDownstreamSourceNode* pSource); // Convert to subplan to string for the scheduler to send to the executor -int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len); -int32_t qStringToSubplan(const char* str, SSubplan** subplan); +int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen); +int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan); char* qQueryPlanToString(const SQueryPlan* pPlan); SQueryPlan* qStringToQueryPlan(const char* pStr); diff --git a/source/libs/planner/src/physicalPlan.c b/source/libs/planner/src/physicalPlan.c index b0c087b36a..265aac40fb 100644 --- a/source/libs/planner/src/physicalPlan.c +++ b/source/libs/planner/src/physicalPlan.c @@ -28,7 +28,6 @@ typedef struct SPhysiPlanContext { int16_t nextDataBlockId; SArray* pLocationHelper; SArray* pExecNodeList; - int32_t groupId; int32_t subplanId; } SPhysiPlanContext; @@ -537,6 +536,8 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubpl SSubplan* pSubplan = nodesMakeNode(QUERY_NODE_PHYSICAL_SUBPLAN); CHECK_ALLOC(pSubplan, NULL); pSubplan->id = pLogicSubplan->id; + pSubplan->subplanType = pLogicSubplan->subplanType; + pSubplan->level = pLogicSubplan->level; return pSubplan; } @@ -554,8 +555,6 @@ static SSubplan* createPhysiSubplan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLog pSubplan->pDataSink = createDataDispatcher(pCxt, pSubplan->pNode); pSubplan->msgType = TDMT_VND_QUERY; } - pSubplan->subplanType = pLogicSubplan->subplanType; - pSubplan->level = pLogicSubplan->level; return pSubplan; } @@ -603,7 +602,7 @@ static int32_t pushSubplan(SPhysiPlanContext* pCxt, SNodeptr pSubplan, int32_t l return TSDB_CODE_SUCCESS; } -SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) { +static SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSrc, int32_t level) { SSubLogicPlan* pDst = nodesMakeNode(QUERY_NODE_LOGIC_SUBPLAN); CHECK_ALLOC(pDst, NULL); pDst->pNode = nodesCloneNode(pSrc->pNode); @@ -613,13 +612,13 @@ SSubLogicPlan* singleCloneSubLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* p } pDst->subplanType = pSrc->subplanType; pDst->level = level; - pDst->id.queryId = pCxt->pPlanCxt->queryId; - pDst->id.groupId = pCxt->groupId; + pDst->id.queryId = pSrc->id.queryId; + pDst->id.groupId = pSrc->id.groupId; pDst->id.subplanId = pCxt->subplanId++; return pDst; } -static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) { +static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SNodeList* pGroup) { SVnodeModifLogicNode* pNode = (SVnodeModifLogicNode*)pSubplan->pNode; size_t numOfVgroups = taosArrayGetSize(pNode->pDataBlocks); for (int32_t i = 0; i < numOfVgroups; ++i) { @@ -627,17 +626,12 @@ static int32_t scaleOutForModify(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubpla CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY); SVgDataBlocks* blocks = (SVgDataBlocks*)taosArrayGetP(pNode->pDataBlocks, i); ((SVnodeModifLogicNode*)pNewSubplan->pNode)->pVgDataBlocks = blocks; - // CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan)); } return TSDB_CODE_SUCCESS; } -static int32_t scaleOutForMerge(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) { - // SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); - // CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY); - // CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); - // return TSDB_CODE_SUCCESS; +static int32_t scaleOutForMerge(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SNodeList* pGroup) { return nodesListStrictAppend(pGroup, singleCloneSubLogicPlan(pCxt, pSubplan, level)); } @@ -665,40 +659,60 @@ static int32_t setScanVgroup(SPhysiPlanContext* pCxt, SLogicNode* pNode, const S return doSetScanVgroup(pCxt, pNode, pVgroup, &found); } -static int32_t scaleOutForScan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SNodeList* pGroup) { +static int32_t scaleOutForScan(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SNodeList* pGroup) { if (pSubplan->pVgroupList) { for (int32_t i = 0; i < pSubplan->pVgroupList->numOfVgroups; ++i) { SSubLogicPlan* pNewSubplan = singleCloneSubLogicPlan(pCxt, pSubplan, level); CHECK_ALLOC(pNewSubplan, TSDB_CODE_OUT_OF_MEMORY); CHECK_CODE_EXT(setScanVgroup(pCxt, pNewSubplan->pNode, pSubplan->pVgroupList->vgroups + i)); - // CHECK_CODE_EXT(pushSubplan(pCxt, pNewSubplan, level, pLogicPlan->pSubplans)); CHECK_CODE_EXT(nodesListAppend(pGroup, pNewSubplan)); } return TSDB_CODE_SUCCESS; } else { - return scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pGroup); + return scaleOutForMerge(pCxt, pSubplan, level, pGroup); } } -// static int32_t pushHierarchicalPlan(SNodeList* pParentsGroup, SNodeList* pCurrentGroup, int32_t level, SQueryLogicPlan* pLogicPlan) { -// FOREACH() { +static int32_t appendWithMakeList(SNodeList** pList, SNodeptr pNode) { + if (NULL == *pList) { + *pList = nodesMakeList(); + if (NULL == *pList) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return nodesListAppend(*pList, pNode); +} -// } -// } +static int32_t pushHierarchicalPlan(SPhysiPlanContext* pCxt, SNodeList* pParentsGroup, SNodeList* pCurrentGroup) { + bool topLevel = (0 == LIST_LENGTH(pParentsGroup)); + SNode* pChild = NULL; + FOREACH(pChild, pCurrentGroup) { + if (topLevel) { + CHECK_CODE_EXT(nodesListAppend(pParentsGroup, pChild)); + } else { + SNode* pParent = NULL; + FOREACH(pParent, pParentsGroup) { + CHECK_CODE_EXT(appendWithMakeList(&(((SSubLogicPlan*)pParent)->pChildren), pChild)); + CHECK_CODE_EXT(appendWithMakeList(&(((SSubLogicPlan*)pChild)->pParents), pParent)); + } + } + } + return TSDB_CODE_SUCCESS; +} -static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t level, SQueryLogicPlan* pLogicPlan, SHashObj* pHash, SNodeList* pParentsGroup) { +static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int32_t* pLevel, SNodeList* pParentsGroup) { SNodeList* pCurrentGroup = nodesMakeList(); CHECK_ALLOC(pCurrentGroup, TSDB_CODE_OUT_OF_MEMORY); int32_t code = TSDB_CODE_SUCCESS; switch (pSubplan->subplanType) { case SUBPLAN_TYPE_MERGE: - code = scaleOutForMerge(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup); + code = scaleOutForMerge(pCxt, pSubplan, *pLevel, pCurrentGroup); break; case SUBPLAN_TYPE_SCAN: - code = scaleOutForScan(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup); + code = scaleOutForScan(pCxt, pSubplan, *pLevel, pCurrentGroup); break; case SUBPLAN_TYPE_MODIFY: - code = scaleOutForModify(pCxt, pSubplan, level, pLogicPlan, pCurrentGroup); + code = scaleOutForModify(pCxt, pSubplan, *pLevel, pCurrentGroup); break; default: break; @@ -706,12 +720,12 @@ static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int3 if (TSDB_CODE_SUCCESS != code) { return code; } - // pushHierarchicalPlan(pParentsGroup, pCurrentGroup, level, pLogicPlan); - CHECK_CODE(taosHashPut(pHash, &pCxt->groupId, sizeof(pCxt->groupId), &pSubplan->id.groupId, sizeof(pSubplan->id.groupId)), TSDB_CODE_OUT_OF_MEMORY); - ++(pCxt->groupId); + + CHECK_CODE_EXT(pushHierarchicalPlan(pCxt, pParentsGroup, pCurrentGroup)); + ++(*pLevel); SNode* pChild; FOREACH(pChild, pSubplan->pChildren) { - CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, level + 1, pLogicPlan, pHash, pCurrentGroup)); + CHECK_CODE_EXT(doScaleOut(pCxt, (SSubLogicPlan*)pChild, pLevel, pCurrentGroup)); } return TSDB_CODE_SUCCESS; @@ -720,75 +734,18 @@ static int32_t doScaleOut(SPhysiPlanContext* pCxt, SSubLogicPlan* pSubplan, int3 static SQueryLogicPlan* makeQueryLogicPlan(SPhysiPlanContext* pCxt) { SQueryLogicPlan* pLogicPlan = (SQueryLogicPlan*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN); CHECK_ALLOC(pLogicPlan, NULL); - pLogicPlan->pSubplans = nodesMakeList(); - if (NULL == pLogicPlan->pSubplans) { + pLogicPlan->pTopSubplans = nodesMakeList(); + if (NULL == pLogicPlan->pTopSubplans) { nodesDestroyNode(pLogicPlan); return NULL; } return pLogicPlan; } -static int32_t doMappingLogicPlan(SLogicNode* pNode, SHashObj* pHash) { - if (QUERY_NODE_LOGIC_PLAN_EXCHANGE == nodeType(pNode)) { - SExchangeLogicNode* pExchange = (SExchangeLogicNode*)pNode; - int32_t* pGroupId = taosHashGet(pHash, &pExchange->srcGroupId, sizeof(pExchange->srcGroupId)); - if (NULL == pGroupId) { - return TSDB_CODE_FAILED; - } - pExchange->srcGroupId = *pGroupId; - return TSDB_CODE_SUCCESS; - } - - SNode* pChild; - FOREACH(pChild, pNode->pChildren) { - doMappingLogicPlan((SLogicNode*)pChild, pHash); - } - return TSDB_CODE_SUCCESS; -} - -static int32_t mappingLogicPlan(SQueryLogicPlan* pLogicPlan, SHashObj* pHash) { - SNode* pNode = NULL; - FOREACH(pNode, pLogicPlan->pSubplans) { - SNode* pSubplan = NULL; - FOREACH(pSubplan, ((SNodeListNode*)pNode)->pNodeList) { - int32_t code = doMappingLogicPlan(((SSubLogicPlan*)pSubplan)->pNode, pHash); - if (TSDB_CODE_SUCCESS != code) { - return code; - } - } - } - return TSDB_CODE_SUCCESS; -} - static int32_t scaleOutLogicPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pRootSubLogicPlan, SQueryLogicPlan** pLogicPlan) { *pLogicPlan = makeQueryLogicPlan(pCxt); CHECK_ALLOC(*pLogicPlan, TSDB_CODE_OUT_OF_MEMORY); - SHashObj* pHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); - CHECK_ALLOC(pHash, TSDB_CODE_OUT_OF_MEMORY); - int32_t code = doScaleOut(pCxt, pRootSubLogicPlan, 0, *pLogicPlan, pHash, NULL); - if (TSDB_CODE_SUCCESS == code) { - code = mappingLogicPlan(*pLogicPlan, pHash); - } - taosHashCleanup(pHash); - return code; -} - -typedef struct SBuildPhysiSubplanCxt { - int32_t errCode; - SQueryPlan* pQueryPlan; - SPhysiPlanContext* pPhyCxt; -} SBuildPhysiSubplanCxt; - -static EDealRes doBuildPhysiSubplan(SNode* pNode, void* pContext) { - SBuildPhysiSubplanCxt* pCxt = (SBuildPhysiSubplanCxt*)pContext; - if (QUERY_NODE_LOGIC_SUBPLAN == nodeType(pNode)) { - SSubplan* pSubplan = createPhysiSubplan(pCxt->pPhyCxt, (SSubLogicPlan*)pNode); - CHECK_ALLOC(pSubplan, DEAL_RES_ERROR); - CHECK_CODE(pushSubplan(pCxt->pPhyCxt, pSubplan, ((SSubLogicPlan*)pNode)->level, pCxt->pQueryPlan->pSubplans), DEAL_RES_ERROR); - ++(pCxt->pQueryPlan->numOfSubplans); - return DEAL_RES_IGNORE_CHILD; - } - return DEAL_RES_CONTINUE; + return doScaleOut(pCxt, pRootSubLogicPlan, &((*pLogicPlan)->totalLevel), (*pLogicPlan)->pTopSubplans); } static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) { @@ -803,15 +760,31 @@ static SQueryPlan* makeQueryPhysiPlan(SPhysiPlanContext* pCxt) { return pPlan; } -static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) { - SBuildPhysiSubplanCxt cxt = { .errCode = TSDB_CODE_SUCCESS, .pQueryPlan = makeQueryPhysiPlan(pCxt), .pPhyCxt = pCxt }; - CHECK_ALLOC(cxt.pQueryPlan, TSDB_CODE_OUT_OF_MEMORY); - nodesWalkList(pLogicPlan->pSubplans, doBuildPhysiSubplan, &cxt); - if (TSDB_CODE_SUCCESS != cxt.errCode) { - nodesDestroyNode(cxt.pQueryPlan); - return cxt.errCode; +static int32_t doBuildPhysiPlan(SPhysiPlanContext* pCxt, SSubLogicPlan* pLogicSubplan, SSubplan* pParent, SQueryPlan* pQueryPlan) { + SSubplan* pSubplan = createPhysiSubplan(pCxt, pLogicSubplan); + CHECK_ALLOC(pSubplan, DEAL_RES_ERROR); + CHECK_CODE_EXT(pushSubplan(pCxt, pSubplan, pLogicSubplan->level, pQueryPlan->pSubplans)); + ++(pQueryPlan->numOfSubplans); + if (NULL != pParent) { + CHECK_CODE_EXT(appendWithMakeList(&pParent->pChildren, pSubplan)); + CHECK_CODE_EXT(appendWithMakeList(&pSubplan->pParents, pParent)); + } + + SNode* pChild = NULL; + FOREACH(pChild, pLogicSubplan->pChildren) { + CHECK_CODE_EXT(doBuildPhysiPlan(pCxt, (SSubLogicPlan*)pChild, pSubplan, pQueryPlan)); + } + + return TSDB_CODE_SUCCESS; +} + +static int32_t buildPhysiPlan(SPhysiPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan) { + *pPlan = makeQueryPhysiPlan(pCxt); + CHECK_ALLOC(*pPlan, TSDB_CODE_OUT_OF_MEMORY); + SNode* pSubplan = NULL; + FOREACH(pSubplan, pLogicPlan->pTopSubplans) { + CHECK_CODE_EXT(doBuildPhysiPlan(pCxt, (SSubLogicPlan*)pSubplan, NULL, *pPlan)); } - *pPlan = cxt.pQueryPlan; return TSDB_CODE_SUCCESS; } diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index 4fa9b6ba10..15d5c09f04 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -34,31 +34,66 @@ int32_t qCreateQueryPlan(SPlanContext* pCxt, SQueryPlan** pPlan, SArray* pExecNo return code; } -void qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { +static int32_t setSubplanExecutionNode(SPhysiNode* pNode, int32_t groupId, SDownstreamSourceNode* pSource) { + if (QUERY_NODE_PHYSICAL_PLAN_EXCHANGE == nodeType(pNode)) { + SExchangePhysiNode* pExchange = (SExchangePhysiNode*)pNode; + if (pExchange->srcGroupId == groupId) { + if (NULL == pExchange->pSrcEndPoints) { + pExchange->pSrcEndPoints = nodesMakeList(); + if (NULL == pExchange->pSrcEndPoints) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + if (TSDB_CODE_SUCCESS != nodesListStrictAppend(pExchange->pSrcEndPoints, nodesCloneNode(pSource))) { + return TSDB_CODE_OUT_OF_MEMORY; + } + return TSDB_CODE_SUCCESS; + } + } + SNode* pChild = NULL; + FOREACH(pChild, pNode->pChildren) { + if (TSDB_CODE_SUCCESS != setSubplanExecutionNode((SPhysiNode*)pChild, groupId, pSource)) { + return TSDB_CODE_OUT_OF_MEMORY; + } + } + return TSDB_CODE_SUCCESS; } -int32_t qSubPlanToString(const SSubplan* subplan, char** str, int32_t* len) { - if (SUBPLAN_TYPE_MODIFY == subplan->subplanType) { - SDataInserterNode* insert = (SDataInserterNode*)subplan->pDataSink; - *len = insert->size; - *str = insert->pData; +int32_t qSetSubplanExecutionNode(SSubplan* subplan, int32_t groupId, SDownstreamSourceNode* pSource) { + return setSubplanExecutionNode(subplan->pNode, groupId, pSource); +} + +int32_t qSubPlanToString(const SSubplan* pSubplan, char** pStr, int32_t* pLen) { + if (SUBPLAN_TYPE_MODIFY == pSubplan->subplanType) { + SDataInserterNode* insert = (SDataInserterNode*)pSubplan->pDataSink; + *pLen = insert->size; + *pStr = insert->pData; insert->pData = NULL; return TSDB_CODE_SUCCESS; } - return nodesNodeToString((const SNode*)subplan, false, str, len); + return nodesNodeToString((const SNode*)pSubplan, false, pStr, pLen); } -int32_t qStringToSubplan(const char* str, SSubplan** subplan) { - return nodesStringToNode(str, (SNode**)subplan); +int32_t qStringToSubplan(const char* pStr, SSubplan** pSubplan) { + return nodesStringToNode(pStr, (SNode**)pSubplan); } char* qQueryPlanToString(const SQueryPlan* pPlan) { - + char* pStr = NULL; + int32_t len = 0; + if (TSDB_CODE_SUCCESS != nodesNodeToString(pPlan, false, &pStr, &len)) { + return NULL; + } + return pStr; } SQueryPlan* qStringToQueryPlan(const char* pStr) { - + SQueryPlan* pPlan = NULL; + if (TSDB_CODE_SUCCESS != nodesStringToNode(pStr, (SNode**)&pPlan)) { + return NULL; + } + return pPlan; } void qDestroyQueryPlan(SQueryPlan* pPlan) {