Merge pull request #13440 from taosdata/feature/3.0_wxy
feat: group by distributed split
This commit is contained in:
commit
0ef1a19b23
|
@ -226,6 +226,9 @@ static SColumnNode* createColumnByFunc(const SFunctionNode* pFunc) {
|
|||
}
|
||||
|
||||
bool fmIsDistExecFunc(int32_t funcId) {
|
||||
if (fmIsUserDefinedFunc(funcId)) {
|
||||
return false;
|
||||
}
|
||||
if (!fmIsVectorFunc(funcId)) {
|
||||
return true;
|
||||
}
|
||||
|
|
|
@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
|
|||
|
||||
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||
switch (nodeType(pNode)) {
|
||||
// case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
// return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
|
||||
if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
|
||||
|
@ -365,6 +365,66 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
|
||||
SNodeList* pFunc = pMergeAgg->pAggFuncs;
|
||||
pMergeAgg->pAggFuncs = NULL;
|
||||
SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
|
||||
pMergeAgg->pGroupKeys = NULL;
|
||||
SNodeList* pTargets = pMergeAgg->node.pTargets;
|
||||
pMergeAgg->node.pTargets = NULL;
|
||||
SNodeList* pChildren = pMergeAgg->node.pChildren;
|
||||
pMergeAgg->node.pChildren = NULL;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg);
|
||||
if (NULL == pPartAgg) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
|
||||
pPartAgg->pGroupKeys = pGroupKeys;
|
||||
code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
|
||||
pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
|
||||
if (NULL == pMergeAgg->pGroupKeys) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pMergeAgg->node.pTargets = pTargets;
|
||||
pPartAgg->node.pChildren = pChildren;
|
||||
|
||||
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
|
||||
}
|
||||
|
||||
nodesDestroyList(pFunc);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOutput = (SLogicNode*)pPartAgg;
|
||||
} else {
|
||||
nodesDestroyNode(pPartAgg);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
SLogicNode* pPartAgg = NULL;
|
||||
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||
splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
|
||||
}
|
||||
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
|
@ -386,6 +446,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
|||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
switch (nodeType(info.pSplitNode)) {
|
||||
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||
code = stbSplSplitAggNode(pCxt, &info);
|
||||
break;
|
||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||
code = stbSplSplitWindowNode(pCxt, &info);
|
||||
break;
|
||||
|
|
|
@ -67,3 +67,11 @@ TEST_F(PlanGroupByTest, selectFunc) {
|
|||
run("SELECT MAX(c1), c2 FROM t1 GROUP BY c3");
|
||||
run("SELECT MAX(c1), t1.* FROM t1 GROUP BY c3");
|
||||
}
|
||||
|
||||
TEST_F(PlanGroupByTest, stable) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT COUNT(*) FROM st1");
|
||||
|
||||
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue