Merge pull request #13832 from taosdata/feature/3.0_debug_wxy

feat: execution plan of super table join
This commit is contained in:
Xiaoyu Wang 2022-06-15 09:18:25 +08:00 committed by GitHub
commit ddaf616473
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 213 additions and 120 deletions

View File

@ -91,8 +91,8 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "precision", .bytes = 2 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "single_stable_model", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "status", .bytes = 10 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "retension", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
// {.name = "schemaless", .bytes = 1, .type = TSDB_DATA_TYPE_BOOL},
{.name = "retention", .bytes = 60 + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
// {.name = "update", .bytes = 1, .type = TSDB_DATA_TYPE_TINYINT}, // disable update
};
@ -137,7 +137,7 @@ static const SSysDbTableSchema streamSchema[] = {
{.name = "target_table", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "watermark", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "trigger", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
};
};
static const SSysDbTableSchema userTblsSchema[] = {
{.name = "table_name", .bytes = SYSTABLE_SCH_TABLE_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
@ -221,7 +221,9 @@ static const SSysDbTableSchema transSchema[] = {
{.name = "db", .bytes = SYSTABLE_SCH_DB_NAME_LEN, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "failed_times", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "last_exec_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
{.name = "last_action_info", .bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "last_action_info",
.bytes = (TSDB_TRANS_ERROR_LEN - 1) + VARSTR_HEADER_SIZE,
.type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysDbTableSchema configSchema[] = {
@ -314,8 +316,6 @@ static const SSysDbTableSchema querySchema[] = {
{.name = "sql", .bytes = TSDB_SHOW_SQL_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
};
static const SSysTableMeta perfsMeta[] = {
{TSDB_PERFS_TABLE_CONNECTIONS, connectionsSchema, tListLen(connectionsSchema)},
{TSDB_PERFS_TABLE_QUERIES, querySchema, tListLen(querySchema)},

View File

@ -183,12 +183,12 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw) {
pDb->cfg.pRetensions = taosArrayInit(pDb->cfg.numOfRetensions, sizeof(SRetention));
if (pDb->cfg.pRetensions == NULL) goto _OVER;
for (int32_t i = 0; i < pDb->cfg.numOfRetensions; ++i) {
SRetention retension = {0};
SDB_GET_INT64(pRaw, dataPos, &retension.freq, _OVER)
SDB_GET_INT64(pRaw, dataPos, &retension.keep, _OVER)
SDB_GET_INT8(pRaw, dataPos, &retension.freqUnit, _OVER)
SDB_GET_INT8(pRaw, dataPos, &retension.keepUnit, _OVER)
if (taosArrayPush(pDb->cfg.pRetensions, &retension) == NULL) {
SRetention retention = {0};
SDB_GET_INT64(pRaw, dataPos, &retention.freq, _OVER)
SDB_GET_INT64(pRaw, dataPos, &retention.keep, _OVER)
SDB_GET_INT8(pRaw, dataPos, &retention.freqUnit, _OVER)
SDB_GET_INT8(pRaw, dataPos, &retention.keepUnit, _OVER)
if (taosArrayPush(pDb->cfg.pRetensions, &retention) == NULL) {
goto _OVER;
}
}
@ -1382,7 +1382,7 @@ static void dumpDbInfoData(SSDataBlock *pBlock, SDbObj *pDb, SShowObj *pShow, in
char *status = "ready";
if (objStatus == SDB_STATUS_CREATING) status = "creating";
if (objStatus == SDB_STATUS_DROPPING) status = "dropping";
char statusB[24] = {0};
char statusB[24] = {0};
STR_WITH_SIZE_TO_VARSTR(statusB, status, strlen(status));
if (sysDb) {

View File

@ -15,6 +15,7 @@
#include "functionMgt.h"
#include "planInt.h"
#include "tglobal.h"
#define SPLIT_FLAG_MASK(n) (1 << n)
@ -37,7 +38,8 @@ typedef struct SSplitRule {
FSplit splitFunc;
} SSplitRule;
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
// typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, void* pInfo);
typedef bool (*FSplFindSplitNode)(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, void* pInfo);
static void splSetSubplanVgroups(SLogicSubplan* pSubplan, SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
@ -95,9 +97,23 @@ static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubpla
return code;
}
static bool splMatchByNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode, FSplFindSplitNode func,
void* pInfo) {
if (func(pCxt, pSubplan, pNode, pInfo)) {
return true;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
if (splMatchByNode(pCxt, pSubplan, (SLogicNode*)pChild, func, pInfo)) {
return true;
}
}
return NULL;
}
static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag, FSplFindSplitNode func, void* pInfo) {
if (!SPLIT_FLAG_TEST_MASK(pSubplan->splitFlag, flag)) {
if (func(pCxt, pSubplan, pInfo)) {
if (splMatchByNode(pCxt, pSubplan, pSubplan->pNode, func, pInfo)) {
return true;
}
}
@ -110,6 +126,11 @@ static bool splMatch(SSplitContext* pCxt, SLogicSubplan* pSubplan, int32_t flag,
return false;
}
static void splSetParent(SLogicNode* pNode) {
SNode* pChild = NULL;
FOREACH(pChild, pNode->pChildren) { ((SLogicNode*)pChild)->pParent = pNode; }
}
typedef struct SStableSplitInfo {
SLogicNode* pSplitNode;
SLogicSubplan* pSubplan;
@ -136,11 +157,21 @@ static bool stbSplHasMultiTbScan(bool streamQuery, SLogicNode* pNode) {
return false;
}
SNode* pChild = nodesListGetNode(pNode->pChildren, 0);
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pChild)) {
if (1 != LIST_LENGTH(((SLogicNode*)pChild)->pChildren)) {
return false;
}
pChild = nodesListGetNode(((SLogicNode*)pChild)->pChildren, 0);
}
return (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild) && stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pChild));
}
static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
switch (nodeType(pNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
// case QUERY_NODE_LOGIC_PLAN_JOIN:
// return !(((SJoinLogicNode*)pNode)->isSingleTableJoin);
case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: {
@ -152,35 +183,20 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
}
case QUERY_NODE_LOGIC_PLAN_SORT:
return stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_SCAN:
return stbSplIsMultiTbScan(streamQuery, (SScanLogicNode*)pNode);
default:
break;
}
return false;
}
static SLogicNode* stbSplMatchByNode(bool streamQuery, SLogicNode* pNode) {
if (stbSplNeedSplit(streamQuery, pNode)) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = stbSplMatchByNode(streamQuery, (SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SStableSplitInfo* pInfo) {
SLogicNode* pSplitNode = stbSplMatchByNode(pCxt->pPlanCxt->streamQuery, pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pSplitNode = pSplitNode;
static bool stbSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SStableSplitInfo* pInfo) {
if (stbSplNeedSplit(pCxt->pPlanCxt->streamQuery, pNode)) {
pInfo->pSplitNode = pNode;
pInfo->pSubplan = pSubplan;
return true;
}
return NULL != pSplitNode;
return false;
}
static int32_t stbSplRewriteFuns(const SNodeList* pFuncs, SNodeList** pPartialFuncs, SNodeList** pMergeFuncs) {
@ -258,6 +274,7 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
if (TSDB_CODE_SUCCESS == code) {
pMergeWindow->node.pTargets = pTargets;
pPartWin->node.pChildren = pChildren;
splSetParent((SLogicNode*)pPartWin);
code = stbSplRewriteFuns(pFunc, &pPartWin->pFuncs, &pMergeWindow->pFuncs);
}
int32_t index = 0;
@ -285,13 +302,24 @@ static int32_t stbSplCreatePartWindowNode(SWindowLogicNode* pMergeWindow, SLogic
return code;
}
static int32_t stbSplGetNumOfVgroups(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode)) {
return ((SScanLogicNode*)pNode)->pVgroupList->numOfVgroups;
} else {
if (1 == LIST_LENGTH(pNode->pChildren)) {
return stbSplGetNumOfVgroups((SLogicNode*)nodesListGetNode(pNode->pChildren, 0));
}
}
return 0;
}
static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
SNodeList* pMergeKeys, SLogicNode* pPartChild) {
SMergeLogicNode* pMerge = (SMergeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
if (NULL == pMerge) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMerge->numOfChannels = ((SScanLogicNode*)nodesListGetNode(pPartChild->pChildren, 0))->pVgroupList->numOfVgroups;
pMerge->numOfChannels = stbSplGetNumOfVgroups(pPartChild);
pMerge->srcGroupId = pCxt->groupId;
pMerge->node.precision = pPartChild->precision;
pMerge->pMergeKeys = pMergeKeys;
@ -329,12 +357,12 @@ static int32_t stbSplCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pParent
return code;
}
static int32_t stbSplCreateMergeKeysForInterval(SNode* pWStartTs, SNodeList** pMergeKeys) {
static int32_t stbSplCreateMergeKeysByPrimaryKey(SNode* pPrimaryKey, SNodeList** pMergeKeys) {
SOrderByExprNode* pMergeKey = (SOrderByExprNode*)nodesMakeNode(QUERY_NODE_ORDER_BY_EXPR);
if (NULL == pMergeKey) {
return TSDB_CODE_OUT_OF_MEMORY;
}
pMergeKey->pExpr = nodesCloneNode(pWStartTs);
pMergeKey->pExpr = nodesCloneNode(pPrimaryKey);
if (NULL == pMergeKey->pExpr) {
nodesDestroyNode((SNode*)pMergeKey);
return TSDB_CODE_OUT_OF_MEMORY;
@ -351,7 +379,7 @@ static int32_t stbSplSplitIntervalForBatch(SSplitContext* pCxt, SStableSplitInfo
((SWindowLogicNode*)pPartWindow)->intervalAlgo = INTERVAL_ALGO_HASH;
((SWindowLogicNode*)pInfo->pSplitNode)->intervalAlgo = INTERVAL_ALGO_MERGE;
SNodeList* pMergeKeys = NULL;
code = stbSplCreateMergeKeysForInterval(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
code = stbSplCreateMergeKeysByPrimaryKey(((SWindowLogicNode*)pInfo->pSplitNode)->pTspk, &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, NULL, pInfo->pSplitNode, pMergeKeys, pPartWindow);
}
@ -439,6 +467,7 @@ static int32_t stbSplCreatePartAggNode(SAggLogicNode* pMergeAgg, SLogicNode** pO
pMergeAgg->node.pConditions = pConditions;
pMergeAgg->node.pTargets = pTargets;
pPartAgg->node.pChildren = pChildren;
splSetParent((SLogicNode*)pPartAgg);
code = stbSplRewriteFuns(pFunc, &pPartAgg->pAggFuncs, &pMergeAgg->pAggFuncs);
}
@ -553,6 +582,7 @@ static int32_t stbSplCreatePartSortNode(SSortLogicNode* pSort, SLogicNode** pOut
SNodeList* pMergeKeys = NULL;
if (TSDB_CODE_SUCCESS == code) {
pPartSort->node.pChildren = pChildren;
splSetParent((SLogicNode*)pPartSort);
pPartSort->pSortKeys = pSortKeys;
code = stbSplCreateMergeKeys(pPartSort->pSortKeys, pPartSort->node.pTargets, &pMergeKeys);
}
@ -592,6 +622,56 @@ static int32_t stbSplSplitScanNode(SSplitContext* pCxt, SStableSplitInfo* pInfo)
return code;
}
static SNode* stbSplFindPrimaryKeyFromScan(SScanLogicNode* pScan) {
SNode* pCol = NULL;
FOREACH(pCol, pScan->pScanCols) {
if (PRIMARYKEY_TIMESTAMP_COL_ID == ((SColumnNode*)pCol)->colId) {
return pCol;
}
}
return NULL;
}
static int32_t stbSplSplitScanNodeForJoin(SSplitContext* pCxt, SLogicSubplan* pSubplan, SScanLogicNode* pScan) {
SNodeList* pMergeKeys = NULL;
int32_t code = stbSplCreateMergeKeysByPrimaryKey(stbSplFindPrimaryKeyFromScan(pScan), &pMergeKeys);
if (TSDB_CODE_SUCCESS == code) {
code = stbSplCreateMergeNode(pCxt, pSubplan, (SLogicNode*)pScan, pMergeKeys, (SLogicNode*)pScan);
}
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&pSubplan->pChildren,
(SNode*)splCreateScanSubplan(pCxt, (SLogicNode*)pScan, SPLIT_FLAG_STABLE_SPLIT));
}
return code;
}
static int32_t stbSplSplitJoinNodeImpl(SSplitContext* pCxt, SLogicSubplan* pSubplan, SJoinLogicNode* pJoin) {
int32_t code = TSDB_CODE_SUCCESS;
SNode* pChild = NULL;
FOREACH(pChild, pJoin->node.pChildren) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pChild)) {
code = stbSplSplitScanNodeForJoin(pCxt, pSubplan, (SScanLogicNode*)pChild);
} else if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pChild)) {
code = stbSplSplitJoinNodeImpl(pCxt, pSubplan, (SJoinLogicNode*)pChild);
} else {
code = TSDB_CODE_PLAN_INTERNAL_ERROR;
}
if (TSDB_CODE_SUCCESS != code) {
break;
}
}
return code;
}
static int32_t stbSplSplitJoinNode(SSplitContext* pCxt, SStableSplitInfo* pInfo) {
int32_t code = stbSplSplitJoinNodeImpl(pCxt, pInfo->pSubplan, (SJoinLogicNode*)pInfo->pSplitNode);
if (TSDB_CODE_SUCCESS == code) {
pInfo->pSubplan->subplanType = SUBPLAN_TYPE_MERGE;
SPLIT_FLAG_SET_MASK(pInfo->pSubplan->splitFlag, SPLIT_FLAG_STABLE_SPLIT);
}
return code;
}
static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (pCxt->pPlanCxt->rSmaQuery) {
return TSDB_CODE_SUCCESS;
@ -604,6 +684,12 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
int32_t code = TSDB_CODE_SUCCESS;
switch (nodeType(info.pSplitNode)) {
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = stbSplSplitScanNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_JOIN:
code = stbSplSplitJoinNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_AGG:
code = stbSplSplitAggNode(pCxt, &info);
break;
@ -613,9 +699,6 @@ static int32_t stableSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
case QUERY_NODE_LOGIC_PLAN_SORT:
code = stbSplSplitSortNode(pCxt, &info);
break;
case QUERY_NODE_LOGIC_PLAN_SCAN:
code = stbSplSplitScanNode(pCxt, &info);
break;
default:
break;
}
@ -631,7 +714,12 @@ typedef struct SSigTbJoinSplitInfo {
SLogicSubplan* pSubplan;
} SSigTbJoinSplitInfo;
static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
static bool sigTbJoinSplNeedSplit(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN != nodeType(pNode)) {
return false;
}
SJoinLogicNode* pJoin = (SJoinLogicNode*)pNode;
if (!pJoin->isSingleTableJoin) {
return false;
}
@ -639,28 +727,15 @@ static bool sigTbJoinSplNeedSplit(SJoinLogicNode* pJoin) {
QUERY_NODE_LOGIC_PLAN_EXCHANGE != nodeType(nodesListGetNode(pJoin->node.pChildren, 1));
}
static SJoinLogicNode* sigTbJoinSplMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_JOIN == nodeType(pNode) && sigTbJoinSplNeedSplit((SJoinLogicNode*)pNode)) {
return (SJoinLogicNode*)pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SJoinLogicNode* pSplitNode = sigTbJoinSplMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSigTbJoinSplitInfo* pInfo) {
SJoinLogicNode* pJoin = sigTbJoinSplMatchByNode(pSubplan->pNode);
if (NULL != pJoin) {
pInfo->pJoin = pJoin;
pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pJoin->node.pChildren, 1);
static bool sigTbJoinSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SSigTbJoinSplitInfo* pInfo) {
if (sigTbJoinSplNeedSplit(pNode)) {
pInfo->pJoin = (SJoinLogicNode*)pNode;
pInfo->pSplitNode = (SLogicNode*)nodesListGetNode(pNode->pChildren, 1);
pInfo->pSubplan = pSubplan;
return true;
}
return NULL != pJoin;
return false;
}
static int32_t singleTableJoinSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
@ -753,27 +828,14 @@ typedef struct SUnionAllSplitInfo {
SLogicSubplan* pSubplan;
} SUnionAllSplitInfo;
static SLogicNode* unAllSplMatchByNode(SLogicNode* pNode) {
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SUnionAllSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_PROJECT == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = unAllSplMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static bool unAllSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionAllSplitInfo* pInfo) {
SLogicNode* pSplitNode = unAllSplMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pProject = (SProjectLogicNode*)pSplitNode;
pInfo->pProject = (SProjectLogicNode*)pNode;
pInfo->pSubplan = pSubplan;
return true;
}
return NULL != pSplitNode;
return false;
}
static int32_t unAllSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SProjectLogicNode* pProject) {
@ -828,20 +890,6 @@ typedef struct SUnionDistinctSplitInfo {
SLogicSubplan* pSubplan;
} SUnionDistinctSplitInfo;
static SLogicNode* unDistSplMatchByNode(SLogicNode* pNode) {
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = unDistSplMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SAggLogicNode* pAgg) {
SExchangeLogicNode* pExchange = (SExchangeLogicNode*)nodesMakeNode(QUERY_NODE_LOGIC_PLAN_EXCHANGE);
if (NULL == pExchange) {
@ -859,13 +907,14 @@ static int32_t unDistSplCreateExchangeNode(SSplitContext* pCxt, SLogicSubplan* p
return nodesListMakeAppend(&pAgg->node.pChildren, (SNode*)pExchange);
}
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SUnionDistinctSplitInfo* pInfo) {
SLogicNode* pSplitNode = unDistSplMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pAgg = (SAggLogicNode*)pSplitNode;
static bool unDistSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SUnionDistinctSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
pInfo->pAgg = (SAggLogicNode*)pNode;
pInfo->pSubplan = pSubplan;
return true;
}
return NULL != pSplitNode;
return false;
}
static int32_t unionDistinctSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
@ -888,27 +937,14 @@ typedef struct SSmaIndexSplitInfo {
SLogicSubplan* pSubplan;
} SSmaIndexSplitInfo;
static SLogicNode* smaIdxSplMatchByNode(SLogicNode* pNode) {
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SSmaIndexSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_MERGE == nodeType(pNode) && LIST_LENGTH(pNode->pChildren) > 1) {
return pNode;
}
SNode* pChild;
FOREACH(pChild, pNode->pChildren) {
SLogicNode* pSplitNode = smaIdxSplMatchByNode((SLogicNode*)pChild);
if (NULL != pSplitNode) {
return pSplitNode;
}
}
return NULL;
}
static bool smaIdxSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SSmaIndexSplitInfo* pInfo) {
SLogicNode* pSplitNode = smaIdxSplMatchByNode(pSubplan->pNode);
if (NULL != pSplitNode) {
pInfo->pMerge = (SMergeLogicNode*)pSplitNode;
pInfo->pMerge = (SMergeLogicNode*)pNode;
pInfo->pSubplan = pSubplan;
return true;
}
return NULL != pSplitNode;
return false;
}
static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
@ -926,13 +962,47 @@ static int32_t smaIndexSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
return code;
}
typedef struct SQnodeSplitInfo {
SLogicNode* pSplitNode;
SLogicSubplan* pSubplan;
} SQnodeSplitInfo;
static bool qndSplFindSplitNode(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pNode,
SQnodeSplitInfo* pInfo) {
if (QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(pNode) && NULL != pNode->pParent) {
pInfo->pSplitNode = pNode;
pInfo->pSubplan = pSubplan;
return true;
}
return false;
}
static int32_t qnodeSplit(SSplitContext* pCxt, SLogicSubplan* pSubplan) {
if (QUERY_POLICY_QNODE != tsQueryPolicy) {
return TSDB_CODE_SUCCESS;
}
SQnodeSplitInfo info = {0};
if (!splMatch(pCxt, pSubplan, 0, (FSplFindSplitNode)qndSplFindSplitNode, &info)) {
return TSDB_CODE_SUCCESS;
}
int32_t code = splCreateExchangeNodeForSubplan(pCxt, info.pSubplan, info.pSplitNode, info.pSubplan->subplanType);
if (TSDB_CODE_SUCCESS == code) {
code = nodesListMakeStrictAppend(&info.pSubplan->pChildren, (SNode*)splCreateScanSubplan(pCxt, info.pSplitNode, 0));
}
++(pCxt->groupId);
pCxt->split = true;
return code;
}
// clang-format off
static const SSplitRule splitRuleSet[] = {
{.pName = "SuperTableSplit", .splitFunc = stableSplit},
{.pName = "SingleTableJoinSplit", .splitFunc = singleTableJoinSplit},
{.pName = "UnionAllSplit", .splitFunc = unionAllSplit},
{.pName = "UnionDistinctSplit", .splitFunc = unionDistinctSplit},
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit}
{.pName = "SmaIndexSplit", .splitFunc = smaIndexSplit},
{.pName = "QnodeSplit", .splitFunc = qnodeSplit}
};
// clang-format on

View File

@ -83,5 +83,7 @@ TEST_F(PlanGroupByTest, stable) {
run("SELECT COUNT(*) FROM st1 GROUP BY c1");
run("SELECT COUNT(*) FROM st1 PARTITION BY c2 GROUP BY c1");
run("SELECT SUM(c1) FROM st1 GROUP BY c2 HAVING SUM(c1) IS NOT NULL");
}

View File

@ -60,4 +60,6 @@ TEST_F(PlanIntervalTest, stable) {
run("SELECT COUNT(*) FROM st1 INTERVAL(10s)");
run("SELECT _WSTARTTS, COUNT(*) FROM st1 INTERVAL(10s)");
run("SELECT _WSTARTTS, COUNT(*) FROM st1 PARTITION BY TBNAME INTERVAL(10s)");
}

View File

@ -50,3 +50,9 @@ TEST_F(PlanJoinTest, multiJoin) {
run("SELECT t1.c1, t2.c1 FROM st1s1 t1 JOIN st1s2 t2 ON t1.ts = t2.ts JOIN st1s3 t3 ON t1.ts = t3.ts");
}
TEST_F(PlanJoinTest, stable) {
useDb("root", "test");
run("SELECT t1.c1, t2.c1 FROM st1 t1 JOIN st2 t2 ON t1.ts = t2.ts ");
}

View File

@ -49,4 +49,6 @@ TEST_F(PlanOrderByTest, stable) {
// ORDER BY key is not in the projection list
run("SELECT c2 FROM st1 ORDER BY c1");
run("SELECT c2 FROM st1 PARTITION BY c2 ORDER BY c1");
}

View File

@ -83,3 +83,10 @@ TEST_F(PlanOtherTest, delete) {
run("DELETE FROM st1 WHERE ts > now - 2d and ts < now - 1d AND tag1 = 10");
}
TEST_F(PlanOtherTest, queryPolicy) {
useDb("root", "test");
tsQueryPolicy = QUERY_POLICY_QNODE;
run("SELECT COUNT(*) FROM st1");
}

View File

@ -18,6 +18,10 @@
#include <gtest/gtest.h>
#define ALLOW_FORBID_FUNC
#include "planInt.h"
class PlannerTestBaseImpl;
struct TAOS_MULTI_BIND;