feat: group by distributed split
This commit is contained in:
parent
57ad40e32f
commit
3fe2f16213
|
@ -151,8 +151,8 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
|
||||||
|
|
||||||
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
|
||||||
switch (nodeType(pNode)) {
|
switch (nodeType(pNode)) {
|
||||||
// case QUERY_NODE_LOGIC_PLAN_AGG:
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
// return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(pNode);
|
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
|
||||||
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
|
||||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
|
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNode;
|
||||||
if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
|
if (WINDOW_TYPE_INTERVAL != pWindow->winType) {
|
||||||
|
@ -365,6 +365,66 @@ static int32_t stbSplSplitWindowNode(SSplitContext* pCxt, SStableSplitInfo* pInf
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pOutput) {
|
||||||
|
SNodeList* pFunc = pMergeAgg->pAggFuncs;
|
||||||
|
pMergeAgg->pAggFuncs = NULL;
|
||||||
|
SNodeList* pGroupKeys = pMergeAgg->pGroupKeys;
|
||||||
|
pMergeAgg->pGroupKeys = NULL;
|
||||||
|
SNodeList* pTargets = pMergeAgg->node.pTargets;
|
||||||
|
pMergeAgg->node.pTargets = NULL;
|
||||||
|
SNodeList* pChildren = pMergeAgg->node.pChildren;
|
||||||
|
pMergeAgg->node.pChildren = NULL;
|
||||||
|
|
||||||
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
|
SAggLogicNode* pPartAgg = nodesCloneNode(pMergeAgg);
|
||||||
|
if (NULL == pPartAgg) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
|
||||||
|
pPartAgg->pGroupKeys = pGroupKeys;
|
||||||
|
code = createColumnByRewriteExps(pPartAgg->pGroupKeys, &pPartAgg->node.pTargets);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code && NULL != pGroupKeys) {
|
||||||
|
pMergeAgg->pGroupKeys = nodesCloneList(pPartAgg->node.pTargets);
|
||||||
|
if (NULL == pMergeAgg->pGroupKeys) {
|
||||||
|
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
pMergeAgg->node.pTargets = pTargets;
|
||||||
|
pPartAgg->node.pChildren = pChildren;
|
||||||
|
|
||||||
|
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = createColumnByRewriteExps(pPartAgg->pAggFuncs, &pPartAgg->node.pTargets);
|
||||||
|
}
|
||||||
|
|
||||||
|
nodesDestroyList(pFunc);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
*pOutput = (SLogicNode*)pPartAgg;
|
||||||
|
} else {
|
||||||
|
nodesDestroyNode(pPartAgg);
|
||||||
|
}
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t stbSplSplitAggNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
|
SLogicNode* pPartAgg = NULL;
|
||||||
|
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
|
||||||
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,
|
||||||
|
splCreateScanSubplan(pCxt, pPartAgg, SPLIT_FLAG_STABLE_SPLIT));
|
||||||
|
}
|
||||||
|
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
|
||||||
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
|
int32_t code = splCreateExchangeNodeForSubplan(pCxt, pInfo->pSubplan, pInfo->pSplitNode, SUBPLAN_TYPE_MERGE);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
@ -386,6 +446,9 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
|
||||||
|
|
||||||
int32_t code = TSDB_CODE_SUCCESS;
|
int32_t code = TSDB_CODE_SUCCESS;
|
||||||
switch (nodeType(info.pSplitNode)) {
|
switch (nodeType(info.pSplitNode)) {
|
||||||
|
case QUERY_NODE_LOGIC_PLAN_AGG:
|
||||||
|
code = stbSplSplitAggNode(pCxt, &info);
|
||||||
|
break;
|
||||||
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
case QUERY_NODE_LOGIC_PLAN_WINDOW:
|
||||||
code = stbSplSplitWindowNode(pCxt, &info);
|
code = stbSplSplitWindowNode(pCxt, &info);
|
||||||
break;
|
break;
|
||||||
|
|
Loading…
Reference in New Issue