feat: optimize select agg_func partition by tag slimit

This commit is contained in:
wangjiaming0909 2023-08-10 11:09:21 +08:00
parent 68e0fb924a
commit e587cc50e6
9 changed files with 124 additions and 30 deletions

View File

@ -603,6 +603,8 @@ typedef struct SSubplan {
SNode* pTagCond;
SNode* pTagIndexCond;
bool showRewrite;
int32_t rowsThreshold;
bool dynamicRowThreshold;
} SSubplan;
typedef enum EExplainMode { EXPLAIN_MODE_DISABLE = 1, EXPLAIN_MODE_STATIC, EXPLAIN_MODE_ANALYZE } EExplainMode;

View File

@ -589,6 +589,10 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
int64_t st = taosGetTimestampUs();
int32_t blockIndex = 0;
int32_t rowsThreshold = pTaskInfo->pSubplan->rowsThreshold;
if (!pTaskInfo->pSubplan->dynamicRowThreshold || 4096 <= pTaskInfo->pSubplan->rowsThreshold) {
rowsThreshold = 4096;
}
while ((pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot)) != NULL) {
SSDataBlock* p = NULL;
if (blockIndex >= taosArrayGetSize(pTaskInfo->pResultBlockList)) {
@ -606,10 +610,13 @@ int32_t qExecTaskOpt(qTaskInfo_t tinfo, SArray* pResList, uint64_t* useconds, bo
ASSERT(p->info.rows > 0);
taosArrayPush(pResList, &p);
if (current >= 4096) {
if (current >= rowsThreshold) {
break;
}
}
if (pTaskInfo->pSubplan->dynamicRowThreshold) {
pTaskInfo->pSubplan->rowsThreshold -= current;
}
*hasMore = (pRes != NULL);
uint64_t el = (taosGetTimestampUs() - st);

View File

@ -2814,6 +2814,8 @@ static const char* jkSubplanDataSink = "DataSink";
static const char* jkSubplanTagCond = "TagCond";
static const char* jkSubplanTagIndexCond = "TagIndexCond";
static const char* jkSubplanShowRewrite = "ShowRewrite";
static const char* jkSubplanRowsThreshold = "RowThreshold";
static const char* jkSubplanDynamicRowsThreshold = "DyRowThreshold";
static int32_t subplanToJson(const void* pObj, SJson* pJson) {
const SSubplan* pNode = (const SSubplan*)pObj;
@ -2852,6 +2854,12 @@ static int32_t subplanToJson(const void* pObj, SJson* pJson) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSubplanShowRewrite, pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddIntegerToObject(pJson, jkSubplanRowsThreshold, pNode->rowsThreshold);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonAddBoolToObject(pJson, jkSubplanDynamicRowsThreshold, pNode->dynamicRowThreshold);
}
return code;
}
@ -2893,6 +2901,12 @@ static int32_t jsonToSubplan(const SJson* pJson, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSubplanShowRewrite, &pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetIntValue(pJson, jkSubplanRowsThreshold, &pNode->rowsThreshold);
}
if (TSDB_CODE_SUCCESS == code) {
code = tjsonGetBoolValue(pJson, jkSubplanDynamicRowsThreshold, &pNode->dynamicRowThreshold);
}
return code;
}

View File

@ -3538,6 +3538,12 @@ static int32_t subplanInlineToMsg(const void* pObj, STlvEncoder* pEncoder) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueI32(pEncoder, pNode->rowsThreshold);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvEncodeValueBool(pEncoder, pNode->dynamicRowThreshold);
}
return code;
}
@ -3587,6 +3593,12 @@ static int32_t msgToSubplanInline(STlvDecoder* pDecoder, void* pObj) {
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->showRewrite);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueI32(pDecoder, &pNode->rowsThreshold);
}
if (TSDB_CODE_SUCCESS == code) {
code = tlvDecodeValueBool(pDecoder, &pNode->dynamicRowThreshold);
}
return code;
}

View File

@ -43,8 +43,14 @@ int32_t splitLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
int32_t scaleOutLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan, SQueryLogicPlan** pLogicPlan);
int32_t createPhysiPlan(SPlanContext* pCxt, SQueryLogicPlan* pLogicPlan, SQueryPlan** pPlan, SArray* pExecNodeList);
bool isPartTableAgg(SAggLogicNode* pAgg);
bool isPartTableWinodw(SWindowLogicNode* pWindow);
bool isPartTableAgg(SAggLogicNode* pAgg);
bool isPartTagAgg(SAggLogicNode* pAgg);
bool isPartTableWinodw(SWindowLogicNode* pWindow);
#define CLONE_LIMIT 1
#define CLONE_SLIMIT 1 << 1
#define CLONE_LIMIT_SLIMIT (CLONE_LIMIT | CLONE_SLIMIT)
bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat);
#ifdef __cplusplus
}

View File

@ -368,8 +368,8 @@ static void scanPathOptSetGroupOrderScan(SScanLogicNode* pScan) {
if (pScan->node.pParent && nodeType(pScan->node.pParent) == QUERY_NODE_LOGIC_PLAN_AGG) {
SAggLogicNode* pAgg = (SAggLogicNode*)pScan->node.pParent;
bool withSlimit = pAgg->node.pSlimit != NULL || (pAgg->node.pParent && pAgg->node.pParent->pSlimit);
if (withSlimit && isPartTableAgg(pAgg)) {
bool withSlimit = pAgg->node.pSlimit != NULL;
if (withSlimit && (isPartTableAgg(pAgg) || isPartTagAgg(pAgg))) {
pScan->groupOrderScan = pAgg->node.forceCreateNonBlockingOptr = true;
}
}
@ -2698,39 +2698,31 @@ static void swapLimit(SLogicNode* pParent, SLogicNode* pChild) {
pParent->pLimit = NULL;
}
static void cloneLimit(SLogicNode* pParent, SLogicNode* pChild) {
SLimitNode* pLimit = NULL;
if (pParent->pLimit) {
pChild->pLimit = nodesCloneNode(pParent->pLimit);
pLimit = (SLimitNode*)pChild->pLimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
}
if (pParent->pSlimit) {
pChild->pSlimit = nodesCloneNode(pParent->pSlimit);
pLimit = (SLimitNode*)pChild->pSlimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
}
}
static bool pushDownLimitHow(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo);
static bool pushDownLimitTo(SLogicNode* pNodeWithLimit, SLogicNode* pNodeLimitPushTo) {
switch (nodeType(pNodeLimitPushTo)) {
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
SWindowLogicNode* pWindow = (SWindowLogicNode*)pNodeLimitPushTo;
if (pWindow->winType != WINDOW_TYPE_INTERVAL) break;
cloneLimit(pNodeWithLimit, pNodeLimitPushTo);
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
return true;
}
case QUERY_NODE_LOGIC_PLAN_FILL:
case QUERY_NODE_LOGIC_PLAN_SORT: {
cloneLimit(pNodeWithLimit, pNodeLimitPushTo);
cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_LIMIT_SLIMIT);
SNode* pChild = NULL;
FOREACH(pChild, pNodeLimitPushTo->pChildren) { pushDownLimitHow(pNodeLimitPushTo, (SLogicNode*)pChild); }
return true;
}
case QUERY_NODE_LOGIC_PLAN_AGG: {
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT &&
(isPartTagAgg((SAggLogicNode*)pNodeLimitPushTo) || isPartTableAgg((SAggLogicNode*)pNodeLimitPushTo))) {
// when part by tag, slimit will be cloned to agg, and it will be pipelined.
// The scan below will do scanning with group order
return cloneLimit(pNodeWithLimit, pNodeLimitPushTo, CLONE_SLIMIT);
}
break;
}
case QUERY_NODE_LOGIC_PLAN_SCAN:
if (nodeType(pNodeWithLimit) == QUERY_NODE_LOGIC_PLAN_PROJECT && pNodeWithLimit->pLimit) {
swapLimit(pNodeWithLimit, pNodeLimitPushTo);

View File

@ -872,12 +872,16 @@ static int32_t rewritePrecalcExpr(SPhysiPlanContext* pCxt, SNode* pNode, SNodeLi
}
static int32_t createAggPhysiNode(SPhysiPlanContext* pCxt, SNodeList* pChildren, SAggLogicNode* pAggLogicNode,
SPhysiNode** pPhyNode) {
SPhysiNode** pPhyNode, SSubplan* pSubPlan) {
SAggPhysiNode* pAgg =
(SAggPhysiNode*)makePhysiNode(pCxt, (SLogicNode*)pAggLogicNode, QUERY_NODE_PHYSICAL_PLAN_HASH_AGG);
if (NULL == pAgg) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pAgg->node.pSlimit) {
pSubPlan->dynamicRowThreshold = true;
pSubPlan->rowsThreshold = ((SLimitNode*)pAgg->node.pSlimit)->limit;
}
pAgg->mergeDataBlock = (GROUP_ACTION_KEEP == pAggLogicNode->node.groupAction ? false : true);
pAgg->groupKeyOptimized = pAggLogicNode->hasGroupKeyOptimized;
@ -1617,7 +1621,7 @@ static int32_t doCreatePhysiNode(SPhysiPlanContext* pCxt, SLogicNode* pLogicNode
case QUERY_NODE_LOGIC_PLAN_JOIN:
return createJoinPhysiNode(pCxt, pChildren, (SJoinLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_AGG:
return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode);
return createAggPhysiNode(pCxt, pChildren, (SAggLogicNode*)pLogicNode, pPhyNode, pSubplan);
case QUERY_NODE_LOGIC_PLAN_PROJECT:
return createProjectPhysiNode(pCxt, pChildren, (SProjectLogicNode*)pLogicNode, pPhyNode);
case QUERY_NODE_LOGIC_PLAN_EXCHANGE:
@ -1721,6 +1725,8 @@ static SSubplan* makeSubplan(SPhysiPlanContext* pCxt, SLogicSubplan* pLogicSubpl
pSubplan->id = pLogicSubplan->id;
pSubplan->subplanType = pLogicSubplan->subplanType;
pSubplan->level = pLogicSubplan->level;
pSubplan->rowsThreshold = 4096;
pSubplan->dynamicRowThreshold = false;
if (NULL != pCxt->pPlanCxt->pUser) {
snprintf(pSubplan->user, sizeof(pSubplan->user), "%s", pCxt->pPlanCxt->pUser);
}

View File

@ -867,8 +867,16 @@ static int32_t stbSplSplitAggNodeForPartTable(SSplitContext* pCxt, SStableSplitI
static int32_t stbSplSplitAggNodeForCrossTable(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
SLogicNode* pPartAgg = NULL;
int32_t code = stbSplCreatePartAggNode((SAggLogicNode*)pInfo->pSplitNode, &pPartAgg);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
// if slimit was pushed down to agg, agg will be pipelined mode, add sort merge before parent agg
if ((SAggLogicNode*)pInfo->pSplitNode->pSlimit)
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, NULL, pPartAgg, true);
else
code = stbSplCreateExchangeNode(pCxt, pInfo->pSplitNode, pPartAgg);
} else {
nodesDestroyNode((SNode*)pPartAgg);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pInfo->pSubplan->pChildren,

View File

@ -349,7 +349,7 @@ static bool stbHasPartTbname(SNodeList* pPartKeys) {
return false;
}
static SNodeList* stbSplGetPartKeys(SLogicNode* pNode) {
static SNodeList* stbGetPartKeys(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pGroupTags;
} else if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
@ -367,11 +367,58 @@ bool isPartTableAgg(SAggLogicNode* pAgg) {
return stbHasPartTbname(pAgg->pGroupKeys) &&
stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
}
return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}
static bool stbHasPartTag(SNodeList* pPartKeys) {
if (NULL == pPartKeys) {
return false;
}
SNode* pPartKey = NULL;
FOREACH(pPartKey, pPartKeys) {
if (QUERY_NODE_GROUPING_SET == nodeType(pPartKey)) {
pPartKey = nodesListGetNode(((SGroupingSetNode*)pPartKey)->pParameterList, 0);
}
if ((QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TAGS == ((SFunctionNode*)pPartKey)->funcType) ||
(QUERY_NODE_COLUMN == nodeType(pPartKey) && COLUMN_TYPE_TAG == ((SColumnNode*)pPartKey)->colType)) {
return true;
}
}
return false;
}
bool isPartTagAgg(SAggLogicNode* pAgg) {
if (1 != LIST_LENGTH(pAgg->node.pChildren)) {
return false;
}
if (pAgg->pGroupKeys) {
return stbHasPartTag(pAgg->pGroupKeys) &&
stbNotSystemScan((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0));
}
return stbHasPartTag(stbGetPartKeys((SLogicNode*)nodesListGetNode(pAgg->node.pChildren, 0)));
}
bool isPartTableWinodw(SWindowLogicNode* pWindow) {
return stbHasPartTbname(stbSplGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
return stbHasPartTbname(stbGetPartKeys((SLogicNode*)nodesListGetNode(pWindow->node.pChildren, 0)));
}
bool cloneLimit(SLogicNode* pParent, SLogicNode* pChild, uint8_t cloneWhat) {
SLimitNode* pLimit;
bool cloned = false;
if (pParent->pLimit && (cloneWhat & CLONE_LIMIT)) {
pChild->pLimit = nodesCloneNode(pParent->pLimit);
pLimit = (SLimitNode*)pChild->pLimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
cloned = true;
}
if (pParent->pSlimit && (cloneWhat & CLONE_SLIMIT)) {
pChild->pSlimit = nodesCloneNode(pParent->pSlimit);
pLimit = (SLimitNode*)pChild->pSlimit;
pLimit->limit += pLimit->offset;
pLimit->offset = 0;
cloned = true;
}
return cloned;
}