feat: sma index optimize
This commit is contained in:
parent
317cc8fc13
commit
c31eb5231a
|
@ -25,9 +25,6 @@
|
|||
#include "tglobal.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define SMA_TABLE_NAME "#sma_table"
|
||||
#define SMA_COL_NAME_PREFIX "#sma_col_"
|
||||
|
||||
#define generateDealNodeErrMsg(pCxt, code, ...) \
|
||||
(pCxt->errCode = generateSyntaxErrMsg(&pCxt->msgBuf, code, ##__VA_ARGS__), DEAL_RES_ERROR)
|
||||
|
||||
|
@ -2029,22 +2026,7 @@ typedef struct SSmaIndexMatchFuncsCxt {
|
|||
bool match;
|
||||
} SSmaIndexMatchFuncsCxt;
|
||||
|
||||
static int32_t smaOptCreateSmaCol(SNode* pSmaFunc, int32_t index, SNode** pOutput) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pCol->tableId = tableId;
|
||||
pCol->tableType = TSDB_SUPER_TABLE;
|
||||
pCol->colId = index + 2; // skip timestamp primary col
|
||||
pCol->colType = COLUMN_TYPE_COLUMN;
|
||||
snprintf(pCol->colName, sizeof(pCol->colName), SMA_COL_NAME_PREFIX "%d", pCol->colId);
|
||||
strcpy(pCol->tableName, SMA_TABLE_NAME);
|
||||
strcpy(pCol->tableAlias, SMA_TABLE_NAME);
|
||||
pCol->node.resType = pSmaFunc->resType;
|
||||
strcpy(pCol->node.aliasName, pSmaFunc->aliasName);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
static int32_t collectSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, int32_t index, SNode* pSmaFunc) {
|
||||
if (NULL == pCxt->pUseMap) {
|
||||
|
@ -2072,22 +2054,6 @@ static int32_t collectSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, int32_t index, SNode
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t findSmaFunc(SSmaIndexMatchFuncsCxt* pCxt, SNode* pNode, bool* pFound) {
|
||||
if (!isAggFunc(pNode)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t index = 0;
|
||||
SNode* pSmaFunc = NULL;
|
||||
FOREACH(pSmaFunc, pCxt->pSmaFuncs) {
|
||||
if (nodesEqualNode(pSmaFunc, pNode)) {
|
||||
*pFound = true;
|
||||
return collectSmaFunc(pCxt, index, pSmaFunc);
|
||||
}
|
||||
++index;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static EDealRes matchSmaFuncsImpl(SNode* pNode, void* pContext) {
|
||||
SSmaIndexMatchFuncsCxt* pCxt = pContext;
|
||||
|
@ -2132,23 +2098,7 @@ static int32_t matchSmaFuncs(SSelectStmt* pSelect, STableIndexInfo* pIndex, SNod
|
|||
return cxt.errCode;
|
||||
}
|
||||
|
||||
static int32_t couldApplySmaIndex(STranslateContext* pCxt, SSelectStmt* pSelect, STableIndexInfo* pIndex,
|
||||
SNodeList** pFuncs, SNodeList** pCols) {
|
||||
if (!equalIntervalWindow((SIntervalWindowNode*)pSelect->pWindow, pSelect->pWhere, pIndex)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SNodeList* pSmaFuncs = NULL;
|
||||
int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pCxt->currClause = SQL_CLAUSE_SELECT;
|
||||
code = translateExprList(pCxt, pSmaFuncs);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = matchSmaFuncs(pSelect, pIndex, pSmaFuncs, pFuncs, pCols);
|
||||
}
|
||||
nodesDestroyList(pSmaFuncs);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
||||
typedef struct SSmaIndexRewriteFuncsCxt {
|
||||
int32_t errCode;
|
||||
|
|
|
@ -37,6 +37,7 @@ extern "C" {
|
|||
|
||||
int32_t generateUsageErrMsg(char* pBuf, int32_t len, int32_t errCode, ...);
|
||||
int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList);
|
||||
int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew);
|
||||
|
||||
int32_t createLogicPlan(SPlanContext* pCxt, SLogicSubplan** pLogicSubplan);
|
||||
int32_t optimizeLogicPlan(SPlanContext* pCxt, SLogicSubplan* pLogicSubplan);
|
||||
|
|
|
@ -751,35 +751,10 @@ static int32_t opkOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan)
|
|||
return opkOptimizeImpl(pCxt, pSort);
|
||||
}
|
||||
|
||||
static int32_t smaOptFindSmaFunc(SNodeList* pSmaFuncs, SNode* pFunc, SNode** pCol, bool* pFound) {
|
||||
int32_t index = 0;
|
||||
SNode* pSmaFunc = NULL;
|
||||
FOREACH(pSmaFunc, pSmaFuncs) {
|
||||
if (nodesEqualNode(pSmaFunc, pFunc)) {
|
||||
*pFound = true;
|
||||
return collectSmaFunc(pCxt, index, pSmaFunc);
|
||||
}
|
||||
++index;
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool smaOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex) {
|
||||
if (pScan->interval != pIndex->interval || pScan->intervalUnit != pIndex->intervalUnit ||
|
||||
pScan->offset != pIndex->offset || pScan->sliding != pIndex->sliding ||
|
||||
pScan->slidingUnit != pIndex->slidingUnit) {
|
||||
return false;
|
||||
}
|
||||
// todo time range
|
||||
SNodeList* pFuncs = NULL;
|
||||
SNode* pFunc = NULL;
|
||||
FOREACH(pFunc, ((SWindowLogicNode*)pScan->node.pParent)->pFuncs) {}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
static bool smaOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode)) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(pNode) || NULL == pNode->pParent ||
|
||||
QUERY_NODE_LOGIC_PLAN_WINDOW != nodeType(pNode->pParent) ||
|
||||
WINDOW_TYPE_INTERVAL != ((SWindowLogicNode*)pNode->pParent)->winType) {
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -788,43 +763,44 @@ static bool smaOptMayBeOptimized(SLogicNode* pNode) {
|
|||
return false;
|
||||
}
|
||||
|
||||
int32_t size = taosArrayGetSize(pScan->pSmaIndexes);
|
||||
for (int32_t i = 0; i < size; ++i) {
|
||||
STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i);
|
||||
return true;
|
||||
}
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
static int32_t smaOptCreateMerge(SNodeList* pTargets) {
|
||||
static int32_t smaOptCreateMerge(SLogicNode* pChild, SNodeList* pMergeKeys, SNodeList* pTargets, SLogicNode** pOutput) {
|
||||
SMergeLogicNode* pMerge = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_MERGE);
|
||||
if (NULL == pMerge) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
pMerge->node.precision = pPartChild->precision;
|
||||
pMerge->node.precision = pChild->precision;
|
||||
pMerge->pMergeKeys = pMergeKeys;
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
pMerge->pInputs = nodesCloneList(pPartChild->pTargets);
|
||||
pMerge->pInputs = nodesCloneList(pChild->pTargets);
|
||||
pMerge->node.pTargets = nodesCloneList(pTargets);
|
||||
if (NULL == pMerge->node.pTargets || NULL == pMerge->pInputs) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
nodesDestroyNode(pMerge);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
*pOutput = (SLogicNode*)pMerge;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smaOptRecombinationNode(SLogicSubplan* pLogicSubplan, SLogicNode* pInterval, SLogicNode* pMerge,
|
||||
SLogicNode* pSmaScan) {
|
||||
int32_t code = nodesListMakeAppend(&pMerge->pChildren, pInterval);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = nodesListMakeAppend(&pMerge->pChildren, pSmaScan);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (NULL == pSubplan) {
|
||||
code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge);
|
||||
} else {
|
||||
code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
nodesDestroyNode(pMerge);
|
||||
code = replaceLogicNode(pLogicSubplan, pInterval, pMerge);
|
||||
pSmaScan->pParent = pMerge;
|
||||
pInterval->pParent = pMerge;
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList* pCols,
|
||||
SScanLogicNode** pOutput) {
|
||||
SLogicNode** pOutput) {
|
||||
SScanLogicNode* pSmaScan = nodesMakeNode(QUERY_NODE_LOGIC_PLAN_SCAN);
|
||||
if (NULL == pSmaScan) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
|
@ -848,18 +824,131 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde
|
|||
pSmaScan->pVgroupList->vgroups[0].vgId = pIndex->dstVgId;
|
||||
memcpy(&(pSmaScan->pVgroupList->vgroups[0].epSet), &pIndex->epSet, sizeof(SEpSet));
|
||||
|
||||
*pOutput = pSmaScan;
|
||||
*pOutput = (SLogicNode*)pSmaScan;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SScanLogicNode* pScan) { return TSDB_CODE_SUCCESS; }
|
||||
static bool smaOptEqualInterval(SWindowLogicNode* pWindow, STableIndexInfo* pIndex) {
|
||||
if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit ||
|
||||
pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding ||
|
||||
pWindow->slidingUnit != pIndex->slidingUnit) {
|
||||
return false;
|
||||
}
|
||||
// todo time range
|
||||
return true;
|
||||
}
|
||||
|
||||
static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicNode* pLogicNode) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicNode, opkSortMayBeOptimized);
|
||||
// #define SMA_TABLE_NAME "#sma_table"
|
||||
// #define SMA_COL_NAME_PREFIX "#sma_col_"
|
||||
|
||||
static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
return NULL;
|
||||
}
|
||||
pCol->tableId = tableId;
|
||||
pCol->tableType = TSDB_SUPER_TABLE;
|
||||
pCol->colId = colId;
|
||||
pCol->colType = COLUMN_TYPE_COLUMN;
|
||||
snprintf(pCol->colName, sizeof(pCol->colName), "#sma_col_%d", pCol->colId);
|
||||
// strcpy(pCol->tableName, SMA_TABLE_NAME);
|
||||
// strcpy(pCol->tableAlias, SMA_TABLE_NAME);
|
||||
pCol->node.resType = ((SExprNode*)pFunc)->resType;
|
||||
strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName);
|
||||
return (SNode*)pCol;
|
||||
}
|
||||
|
||||
static int32_t smaOptFindSmaFunc(SNode* pQueryFunc, SNodeList* pSmaFuncs) {
|
||||
int32_t index = 0;
|
||||
SNode* pSmaFunc = NULL;
|
||||
FOREACH(pSmaFunc, pSmaFuncs) {
|
||||
if (nodesEqualNode(pQueryFunc, pSmaFunc)) {
|
||||
return index;
|
||||
}
|
||||
++index;
|
||||
}
|
||||
return -1;
|
||||
}
|
||||
|
||||
static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeList* pSmaFuncs, SNodeList** pOutput,
|
||||
int32_t* pWStrartIndex) {
|
||||
SNodeList* pCols = NULL;
|
||||
SNode* pFunc = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t index = 0;
|
||||
FOREACH(pFunc, pFuncs) {
|
||||
if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
|
||||
*pWStrartIndex = index;
|
||||
}
|
||||
int32_t smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs);
|
||||
if (smaFuncIndex < 0) {
|
||||
break;
|
||||
} else {
|
||||
code = nodesListMakeStrictAppend(&pCols, smaOptCreateSmaCol(pFunc, tableId, smaFuncIndex + 2));
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
++index;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
*pOutput = pCols;
|
||||
} else {
|
||||
nodesDestroyList(pCols);
|
||||
}
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t smaOptCouldApplyIndex(SWindowLogicNode* pWindow, STableIndexInfo* pIndex, SNodeList** pCols,
|
||||
int32_t* pWStrartIndex) {
|
||||
if (!smaOptEqualInterval(pWindow, pIndex)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SNodeList* pSmaFuncs = NULL;
|
||||
int32_t code = nodesStringToList(pIndex->expr, &pSmaFuncs);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = smaOptCreateSmaCols(pWindow->pFuncs, pIndex->dstTbUid, pSmaFuncs, pCols, pWStrartIndex);
|
||||
}
|
||||
nodesDestroyList(pSmaFuncs);
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
|
||||
SNodeList* pSmaCols) {
|
||||
SLogicNode* pSmaScan = NULL;
|
||||
SLogicNode* pMerge = NULL;
|
||||
int32_t code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = smaOptCreateMerge(pScan->node.pParent, NULL, pSmaCols, &pMerge);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = smaOptRecombinationNode(pLogicSubplan, pScan->node.pParent, pMerge, pSmaScan);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
|
||||
int32_t nindexes = taosArrayGetSize(pScan->pSmaIndexes);
|
||||
for (int32_t i = 0; i < nindexes; ++i) {
|
||||
STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i);
|
||||
SNodeList* pSmaCols = NULL;
|
||||
int32_t wstrartIndex = -1;
|
||||
int32_t code = smaOptCouldApplyIndex((SWindowLogicNode*)pScan->node.pParent, pIndex, &pSmaCols, &wstrartIndex);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) {
|
||||
code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols);
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t smaOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, smaOptMayBeOptimized);
|
||||
if (NULL == pScan) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
return smaOptimizeImpl(pCxt, pScan);
|
||||
return smaOptimizeImpl(pCxt, pLogicSubplan, pScan);
|
||||
}
|
||||
|
||||
// clang-format off
|
||||
|
|
|
@ -80,29 +80,12 @@ static int32_t splCreateExchangeNode(SSplitContext* pCxt, SLogicNode* pChild, SE
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static int32_t splReplaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) {
|
||||
if (NULL == pOld->pParent) {
|
||||
pSubplan->pNode = (SLogicNode*)pNew;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pOld->pParent->pChildren) {
|
||||
if (nodesEqualNode(pNode, pOld)) {
|
||||
REPLACE_NODE(pNew);
|
||||
pNew->pParent = pOld->pParent;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
||||
static int32_t splCreateExchangeNodeForSubplan(SSplitContext* pCxt, SLogicSubplan* pSubplan, SLogicNode* pSplitNode,
|
||||
ESubplanType subplanType) {
|
||||
SExchangeLogicNode* pExchange = NULL;
|
||||
int32_t code = splCreateExchangeNode(pCxt, pSplitNode, &pExchange);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
|
||||
code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pExchange);
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
pSubplan->subplanType = subplanType;
|
||||
|
@ -322,7 +305,7 @@ static int32_t stbSplCreateMergeNode(SSplitContext* pCxt, SLogicSubplan* pSubpla
|
|||
if (NULL == pSubplan) {
|
||||
code = nodesListMakeAppend(&pSplitNode->pChildren, pMerge);
|
||||
} else {
|
||||
code = splReplaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
|
||||
code = replaceLogicNode(pSubplan, pSplitNode, (SLogicNode*)pMerge);
|
||||
}
|
||||
}
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
|
|
|
@ -85,3 +85,20 @@ int32_t createColumnByRewriteExps(SNodeList* pExprs, SNodeList** pList) {
|
|||
}
|
||||
return cxt.errCode;
|
||||
}
|
||||
|
||||
int32_t replaceLogicNode(SLogicSubplan* pSubplan, SLogicNode* pOld, SLogicNode* pNew) {
|
||||
if (NULL == pOld->pParent) {
|
||||
pSubplan->pNode = (SLogicNode*)pNew;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SNode* pNode;
|
||||
FOREACH(pNode, pOld->pParent->pChildren) {
|
||||
if (nodesEqualNode(pNode, pOld)) {
|
||||
REPLACE_NODE(pNew);
|
||||
pNew->pParent = pOld->pParent;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
return TSDB_CODE_PLAN_INTERNAL_ERROR;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue