Merge pull request #13736 from taosdata/feature/3.0_wxy
feat: sma index optimize
This commit is contained in:
commit
0f99cc084a
|
@ -96,6 +96,7 @@ extern bool tsDeadLockKillQuery;
|
|||
|
||||
// query client
|
||||
extern int32_t tsQueryPolicy;
|
||||
extern int32_t tsQuerySmaOptimize;
|
||||
|
||||
// client
|
||||
extern int32_t tsMinSlidingTime;
|
||||
|
|
|
@ -86,6 +86,7 @@ bool tsSmlDataFormat =
|
|||
|
||||
// query
|
||||
int32_t tsQueryPolicy = 1;
|
||||
int32_t tsQuerySmaOptimize = 1;
|
||||
|
||||
/*
|
||||
* denote if the server needs to compress response message at the application layer to client, including query rsp,
|
||||
|
@ -331,6 +332,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
||||
|
@ -541,6 +543,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
|
||||
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
|
||||
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
|
||||
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -24,6 +24,9 @@ extern "C" {
|
|||
#include "parUtil.h"
|
||||
#include "parser.h"
|
||||
|
||||
#define QUERY_SMA_OPTIMIZE_DISABLE 0
|
||||
#define QUERY_SMA_OPTIMIZE_ENABLE 1
|
||||
|
||||
int32_t parseInsertSyntax(SParseContext* pContext, SQuery** pQuery);
|
||||
int32_t parseInsertSql(SParseContext* pContext, SQuery** pQuery);
|
||||
int32_t parse(SParseContext* pParseCxt, SQuery** pQuery);
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include "parInt.h"
|
||||
#include "parToken.h"
|
||||
#include "systable.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
typedef void* (*FMalloc)(size_t);
|
||||
typedef void (*FFree)(void*);
|
||||
|
@ -116,7 +117,7 @@ static EDealRes collectMetaKeyFromFunction(SCollectMetaKeyFromExprCxt* pCxt, SFu
|
|||
}
|
||||
|
||||
static bool needGetTableIndex(SNode* pStmt) {
|
||||
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||
if (QUERY_SMA_OPTIMIZE_ENABLE == tsQuerySmaOptimize && QUERY_NODE_SELECT_STMT == nodeType(pStmt)) {
|
||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt;
|
||||
return (NULL != pSelect->pWindow && QUERY_NODE_INTERVAL_WINDOW == nodeType(pSelect->pWindow));
|
||||
}
|
||||
|
|
|
@ -1411,7 +1411,7 @@ static bool isSingleTable(SRealTableNode* pRealTable) {
|
|||
}
|
||||
|
||||
static int32_t setTableIndex(STranslateContext* pCxt, SName* pName, SRealTableNode* pRealTable) {
|
||||
if (pCxt->createStream) {
|
||||
if (pCxt->createStream || QUERY_SMA_OPTIMIZE_DISABLE == tsQuerySmaOptimize) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
if (NULL != pCxt->pCurrSelectStmt && NULL != pCxt->pCurrSelectStmt->pWindow &&
|
||||
|
|
|
@ -158,7 +158,9 @@ class MockCatalogServiceImpl {
|
|||
}
|
||||
*pIndexes = taosArrayInit(it->second.size(), sizeof(STableIndexInfo));
|
||||
for (const auto& index : it->second) {
|
||||
taosArrayPush(*pIndexes, &index);
|
||||
STableIndexInfo info;
|
||||
|
||||
taosArrayPush(*pIndexes, copyTableIndexInfo(&info, &index));
|
||||
}
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -316,6 +318,12 @@ class MockCatalogServiceImpl {
|
|||
pEpSet->inUse = 0;
|
||||
}
|
||||
|
||||
STableIndexInfo* copyTableIndexInfo(STableIndexInfo* pDst, const STableIndexInfo* pSrc) const {
|
||||
memcpy(pDst, pSrc, sizeof(STableIndexInfo));
|
||||
pDst->expr = strdup(pSrc->expr);
|
||||
return pDst;
|
||||
}
|
||||
|
||||
std::string toDbname(const std::string& dbFullName) const {
|
||||
std::string::size_type n = dbFullName.find(".");
|
||||
if (n == std::string::npos) {
|
||||
|
|
|
@ -17,6 +17,7 @@
|
|||
#include "functionMgt.h"
|
||||
#include "index.h"
|
||||
#include "planInt.h"
|
||||
#include "ttime.h"
|
||||
|
||||
#define OPTIMIZE_FLAG_MASK(n) (1 << n)
|
||||
|
||||
|
@ -816,7 +817,8 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde
|
|||
pSmaScan->dataRequired = FUNC_DATA_REQUIRED_DATA_LOAD;
|
||||
|
||||
pSmaScan->pVgroupList = taosMemoryCalloc(1, sizeof(SVgroupsInfo) + sizeof(SVgroupInfo));
|
||||
if (NULL == pSmaScan->pVgroupList) {
|
||||
pSmaScan->node.pTargets = nodesCloneList(pCols);
|
||||
if (NULL == pSmaScan->pVgroupList || NULL == pSmaScan->node.pTargets) {
|
||||
nodesDestroyNode(pSmaScan);
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -828,19 +830,26 @@ static int32_t smaOptCreateSmaScan(SScanLogicNode* pScan, STableIndexInfo* pInde
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool smaOptEqualInterval(SWindowLogicNode* pWindow, STableIndexInfo* pIndex) {
|
||||
static bool smaOptEqualInterval(SScanLogicNode* pScan, SWindowLogicNode* pWindow, STableIndexInfo* pIndex) {
|
||||
if (pWindow->interval != pIndex->interval || pWindow->intervalUnit != pIndex->intervalUnit ||
|
||||
pWindow->offset != pIndex->offset || pWindow->sliding != pIndex->sliding ||
|
||||
pWindow->slidingUnit != pIndex->slidingUnit) {
|
||||
return false;
|
||||
}
|
||||
// todo time range
|
||||
if (IS_TSWINDOW_SPECIFIED(pScan->scanRange)) {
|
||||
SInterval interval = {.interval = pIndex->interval,
|
||||
.intervalUnit = pIndex->intervalUnit,
|
||||
.offset = pIndex->offset,
|
||||
.offsetUnit = TIME_UNIT_MILLISECOND,
|
||||
.sliding = pIndex->sliding,
|
||||
.slidingUnit = pIndex->slidingUnit,
|
||||
.precision = pScan->node.precision};
|
||||
return (pScan->scanRange.skey == taosTimeTruncate(pScan->scanRange.skey, &interval, pScan->node.precision)) &&
|
||||
(pScan->scanRange.ekey + 1 == taosTimeTruncate(pScan->scanRange.ekey + 1, &interval, pScan->node.precision));
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// #define SMA_TABLE_NAME "#sma_table"
|
||||
// #define SMA_COL_NAME_PREFIX "#sma_col_"
|
||||
|
||||
static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId) {
|
||||
SColumnNode* pCol = nodesMakeNode(QUERY_NODE_COLUMN);
|
||||
if (NULL == pCol) {
|
||||
|
@ -850,9 +859,7 @@ static SNode* smaOptCreateSmaCol(SNode* pFunc, uint64_t tableId, int32_t colId)
|
|||
pCol->tableType = TSDB_SUPER_TABLE;
|
||||
pCol->colId = colId;
|
||||
pCol->colType = COLUMN_TYPE_COLUMN;
|
||||
snprintf(pCol->colName, sizeof(pCol->colName), "#sma_col_%d", pCol->colId);
|
||||
// strcpy(pCol->tableName, SMA_TABLE_NAME);
|
||||
// strcpy(pCol->tableAlias, SMA_TABLE_NAME);
|
||||
strcpy(pCol->colName, ((SExprNode*)pFunc)->aliasName);
|
||||
pCol->node.resType = ((SExprNode*)pFunc)->resType;
|
||||
strcpy(pCol->node.aliasName, ((SExprNode*)pFunc)->aliasName);
|
||||
return (SNode*)pCol;
|
||||
|
@ -876,12 +883,13 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
|
|||
SNode* pFunc = NULL;
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
int32_t index = 0;
|
||||
int32_t smaFuncIndex = -1;
|
||||
*pWStrartIndex = -1;
|
||||
FOREACH(pFunc, pFuncs) {
|
||||
if (FUNCTION_TYPE_WSTARTTS == ((SFunctionNode*)pFunc)->funcType) {
|
||||
*pWStrartIndex = index;
|
||||
}
|
||||
int32_t smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs);
|
||||
smaFuncIndex = smaOptFindSmaFunc(pFunc, pSmaFuncs);
|
||||
if (smaFuncIndex < 0) {
|
||||
break;
|
||||
} else {
|
||||
|
@ -893,7 +901,7 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
|
|||
++index;
|
||||
}
|
||||
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
if (TSDB_CODE_SUCCESS == code && smaFuncIndex >= 0) {
|
||||
*pOutput = pCols;
|
||||
} else {
|
||||
nodesDestroyList(pCols);
|
||||
|
@ -902,9 +910,10 @@ static int32_t smaOptCreateSmaCols(SNodeList* pFuncs, uint64_t tableId, SNodeLis
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t smaOptCouldApplyIndex(SWindowLogicNode* pWindow, STableIndexInfo* pIndex, SNodeList** pCols,
|
||||
static int32_t smaOptCouldApplyIndex(SScanLogicNode* pScan, STableIndexInfo* pIndex, SNodeList** pCols,
|
||||
int32_t* pWStrartIndex) {
|
||||
if (!smaOptEqualInterval(pWindow, pIndex)) {
|
||||
SWindowLogicNode* pWindow = (SWindowLogicNode*)pScan->node.pParent;
|
||||
if (!smaOptEqualInterval(pScan, pWindow, pIndex)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
SNodeList* pSmaFuncs = NULL;
|
||||
|
@ -961,7 +970,7 @@ static int32_t smaOptRewriteInterval(SWindowLogicNode* pInterval, int32_t wstrar
|
|||
return smaOptCreateMergeKey(nodesListGetNode(pInterval->node.pTargets, wstrartIndex), pMergeKeys);
|
||||
}
|
||||
|
||||
static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
|
||||
static int32_t smaOptApplyIndexExt(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
|
||||
SNodeList* pSmaCols, int32_t wstrartIndex) {
|
||||
SWindowLogicNode* pInterval = (SWindowLogicNode*)pScan->node.pParent;
|
||||
SNodeList* pMergeTargets = nodesCloneList(pInterval->node.pTargets);
|
||||
|
@ -984,6 +993,16 @@ static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pS
|
|||
return code;
|
||||
}
|
||||
|
||||
static int32_t smaOptApplyIndex(SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan, STableIndexInfo* pIndex,
|
||||
SNodeList* pSmaCols, int32_t wstrartIndex) {
|
||||
SLogicNode* pSmaScan = NULL;
|
||||
int32_t code = smaOptCreateSmaScan(pScan, pIndex, pSmaCols, &pSmaScan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
code = replaceLogicNode(pLogicSubplan, pScan->node.pParent, pSmaScan);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
static void smaOptDestroySmaIndex(void* p) { taosMemoryFree(((STableIndexInfo*)p)->expr); }
|
||||
|
||||
static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan, SScanLogicNode* pScan) {
|
||||
|
@ -993,7 +1012,7 @@ static int32_t smaOptimizeImpl(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubp
|
|||
STableIndexInfo* pIndex = taosArrayGet(pScan->pSmaIndexes, i);
|
||||
SNodeList* pSmaCols = NULL;
|
||||
int32_t wstrartIndex = -1;
|
||||
code = smaOptCouldApplyIndex((SWindowLogicNode*)pScan->node.pParent, pIndex, &pSmaCols, &wstrartIndex);
|
||||
code = smaOptCouldApplyIndex(pScan, pIndex, &pSmaCols, &wstrartIndex);
|
||||
if (TSDB_CODE_SUCCESS == code && NULL != pSmaCols) {
|
||||
code = smaOptApplyIndex(pLogicSubplan, pScan, pIndex, pSmaCols, wstrartIndex);
|
||||
taosArrayDestroyEx(pScan->pSmaIndexes, smaOptDestroySmaIndex);
|
||||
|
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "planTestUtil.h"
|
||||
#include "planner.h"
|
||||
#include "tglobal.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
@ -45,6 +46,14 @@ TEST_F(PlanOtherTest, createSmaIndex) {
|
|||
run("CREATE SMA INDEX idx1 ON t1 FUNCTION(MAX(c1), MIN(c3 + 10), SUM(c4)) INTERVAL(10s)");
|
||||
|
||||
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");
|
||||
|
||||
run("SELECT _WSTARTTS, MIN(c3 + 10) FROM t1 "
|
||||
"WHERE ts BETWEEN TIMESTAMP '2022-04-01 00:00:00' AND TIMESTAMP '2022-04-30 23:59:59.999' INTERVAL(10s)");
|
||||
|
||||
run("SELECT SUM(c4), MAX(c3) FROM t1 INTERVAL(10s)");
|
||||
|
||||
tsQuerySmaOptimize = 0;
|
||||
run("SELECT SUM(c4) FROM t1 INTERVAL(10s)");
|
||||
}
|
||||
|
||||
TEST_F(PlanOtherTest, explain) {
|
||||
|
|
|
@ -28,6 +28,12 @@ TEST_F(PlanSubqeuryTest, basic) {
|
|||
run("SELECT LAST(c1) FROM (SELECT * FROM t1)");
|
||||
|
||||
run("SELECT c1 FROM (SELECT TODAY() AS c1 FROM t1)");
|
||||
|
||||
run("SELECT NOW() FROM t1");
|
||||
|
||||
run("SELECT NOW() FROM (SELECT * FROM t1)");
|
||||
|
||||
// run("SELECT NOW() FROM (SELECT * FROM t1) ORDER BY ts");
|
||||
}
|
||||
|
||||
TEST_F(PlanSubqeuryTest, doubleGroupBy) {
|
||||
|
|
|
@ -2,20 +2,20 @@
|
|||
#======================b1-start===============
|
||||
|
||||
# ---- user
|
||||
./test.sh -f tsim/user/basic1.sim
|
||||
./test.sh -f tsim/user/pass_alter.sim
|
||||
./test.sh -f tsim/user/pass_len.sim
|
||||
./test.sh -f tsim/user/user_len.sim
|
||||
./test.sh -f tsim/user/privilege1.sim
|
||||
./test.sh -f tsim/user/privilege2.sim
|
||||
#./test.sh -f tsim/user/basic1.sim
|
||||
#./test.sh -f tsim/user/pass_alter.sim
|
||||
#./test.sh -f tsim/user/pass_len.sim
|
||||
#./test.sh -f tsim/user/user_len.sim
|
||||
#./test.sh -f tsim/user/privilege1.sim
|
||||
#./test.sh -f tsim/user/privilege2.sim#
|
||||
|
||||
# ---- db
|
||||
./test.sh -f tsim/db/create_all_options.sim
|
||||
./test.sh -f tsim/db/alter_option.sim
|
||||
./test.sh -f tsim/db/basic1.sim
|
||||
./test.sh -f tsim/db/basic2.sim
|
||||
./test.sh -f tsim/db/basic3.sim
|
||||
./test.sh -f tsim/db/basic6.sim
|
||||
## ---- db
|
||||
#./test.sh -f tsim/db/create_all_options.sim
|
||||
#./test.sh -f tsim/db/alter_option.sim
|
||||
#./test.sh -f tsim/db/basic1.sim
|
||||
#./test.sh -f tsim/db/basic2.sim
|
||||
#./test.sh -f tsim/db/basic3.sim
|
||||
#./test.sh -f tsim/db/basic6.sim
|
||||
./test.sh -f tsim/db/basic7.sim
|
||||
./test.sh -f tsim/db/error1.sim
|
||||
./test.sh -f tsim/db/taosdlog.sim
|
||||
|
|
Loading…
Reference in New Issue