feat: group by tags optimize
This commit is contained in:
parent
3155a228e5
commit
b4057d657e
|
@ -1317,7 +1317,7 @@ int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq
|
|||
int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
char queryStrId[TSDB_QUERY_ID_LEN];
|
||||
char queryStrId[TSDB_QUERY_ID_LEN];
|
||||
} SKillQueryReq;
|
||||
|
||||
int32_t tSerializeSKillQueryReq(void* buf, int32_t bufLen, SKillQueryReq* pReq);
|
||||
|
@ -2284,6 +2284,9 @@ typedef struct {
|
|||
int8_t igNotExists;
|
||||
} SMDropStreamReq;
|
||||
|
||||
int32_t tSerializeSMDropStreamReq(void* buf, int32_t bufLen, const SMDropStreamReq* pReq);
|
||||
int32_t tDeserializeSMDropStreamReq(void* buf, int32_t bufLen, SMDropStreamReq* pReq);
|
||||
|
||||
typedef struct {
|
||||
int8_t reserved;
|
||||
} SMDropStreamRsp;
|
||||
|
|
|
@ -4267,6 +4267,35 @@ int32_t tDeserializeSCMCreateStreamReq(void *buf, int32_t bufLen, SCMCreateStrea
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t tSerializeSMDropStreamReq(void *buf, int32_t bufLen, const SMDropStreamReq *pReq) {
|
||||
SEncoder encoder = {0};
|
||||
tEncoderInit(&encoder, buf, bufLen);
|
||||
|
||||
if (tStartEncode(&encoder) < 0) return -1;
|
||||
if (tEncodeCStr(&encoder, pReq->name) < 0) return -1;
|
||||
if (tEncodeI8(&encoder, pReq->igNotExists) < 0) return -1;
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
int32_t tlen = encoder.pos;
|
||||
tEncoderClear(&encoder);
|
||||
return tlen;
|
||||
}
|
||||
|
||||
int32_t tDeserializeSMDropStreamReq(void *buf, int32_t bufLen, SMDropStreamReq *pReq) {
|
||||
SDecoder decoder = {0};
|
||||
tDecoderInit(&decoder, buf, bufLen);
|
||||
|
||||
if (tStartDecode(&decoder) < 0) return -1;
|
||||
if (tDecodeCStrTo(&decoder, pReq->name) < 0) return -1;
|
||||
if (tDecodeI8(&decoder, &pReq->igNotExists) < 0) return -1;
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tFreeSCMCreateStreamReq(SCMCreateStreamReq *pReq) {
|
||||
taosMemoryFreeClear(pReq->sql);
|
||||
taosMemoryFreeClear(pReq->ast);
|
||||
|
|
|
@ -4081,8 +4081,12 @@ static int32_t translateCreateStream(STranslateContext* pCxt, SCreateStreamStmt*
|
|||
}
|
||||
|
||||
static int32_t translateDropStream(STranslateContext* pCxt, SDropStreamStmt* pStmt) {
|
||||
// todo
|
||||
return TSDB_CODE_SUCCESS;
|
||||
SMDropStreamReq dropReq = {0};
|
||||
SName name;
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->streamName, strlen(pStmt->streamName));
|
||||
tNameGetFullDbName(&name, dropReq.name);
|
||||
dropReq.igNotExists = pStmt->ignoreNotExists;
|
||||
return buildCmdMsg(pCxt, TDMT_MND_DROP_STREAM, (FSerializeFunc)tSerializeSMDropStreamReq, &dropReq);
|
||||
}
|
||||
|
||||
static int32_t readFromFile(char* pName, int32_t* len, char** buf) {
|
||||
|
|
|
@ -159,7 +159,35 @@ TEST_F(ParserInitialDTest, dropSTable) {
|
|||
run("DROP STABLE st1");
|
||||
}
|
||||
|
||||
// todo DROP stream
|
||||
TEST_F(ParserInitialDTest, dropStream) {
|
||||
useDb("root", "test");
|
||||
|
||||
SMDropStreamReq expect = {0};
|
||||
|
||||
auto clearDropStreamReq = [&]() { memset(&expect, 0, sizeof(SMDropStreamReq)); };
|
||||
|
||||
auto setDropStreamReq = [&](const char* pStream, int8_t igNotExists = 0) {
|
||||
sprintf(expect.name, "0.%s", pStream);
|
||||
expect.igNotExists = igNotExists;
|
||||
};
|
||||
|
||||
setCheckDdlFunc([&](const SQuery* pQuery, ParserStage stage) {
|
||||
ASSERT_EQ(nodeType(pQuery->pRoot), QUERY_NODE_DROP_STREAM_STMT);
|
||||
SMDropStreamReq req = {0};
|
||||
ASSERT_TRUE(TSDB_CODE_SUCCESS == tDeserializeSMDropStreamReq(pQuery->pCmdMsg->pMsg, pQuery->pCmdMsg->msgLen, &req));
|
||||
|
||||
ASSERT_EQ(std::string(req.name), std::string(expect.name));
|
||||
ASSERT_EQ(req.igNotExists, expect.igNotExists);
|
||||
});
|
||||
|
||||
setDropStreamReq("s1");
|
||||
run("DROP STREAM s1");
|
||||
clearDropStreamReq();
|
||||
|
||||
setDropStreamReq("s2", 1);
|
||||
run("DROP STREAM IF EXISTS s2");
|
||||
clearDropStreamReq();
|
||||
}
|
||||
|
||||
TEST_F(ParserInitialDTest, dropTable) {
|
||||
useDb("root", "test");
|
||||
|
|
|
@ -1056,28 +1056,47 @@ static bool partTagsOptHasCol(SNodeList* pPartKeys) {
|
|||
return hasCol;
|
||||
}
|
||||
|
||||
static bool partTagsIsOptimizableNode(SLogicNode* pNode) {
|
||||
return ((QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode) ||
|
||||
(QUERY_NODE_LOGIC_PLAN_AGG == nodeType(pNode) && NULL != ((SAggLogicNode*)pNode)->pGroupKeys &&
|
||||
NULL != ((SAggLogicNode*)pNode)->pAggFuncs)) &&
|
||||
1 == LIST_LENGTH(pNode->pChildren) &&
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN == nodeType(nodesListGetNode(pNode->pChildren, 0)));
|
||||
}
|
||||
|
||||
static SNodeList* partTagsGetPartKeys(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
return ((SPartitionLogicNode*)pNode)->pPartitionKeys;
|
||||
} else {
|
||||
return ((SAggLogicNode*)pNode)->pGroupKeys;
|
||||
}
|
||||
}
|
||||
|
||||
static bool partTagsOptMayBeOptimized(SLogicNode* pNode) {
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION != nodeType(pNode) || 1 != LIST_LENGTH(pNode->pChildren) ||
|
||||
QUERY_NODE_LOGIC_PLAN_SCAN != nodeType(nodesListGetNode(pNode->pChildren, 0))) {
|
||||
if (!partTagsIsOptimizableNode(pNode)) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return !partTagsOptHasCol(((SPartitionLogicNode*)pNode)->pPartitionKeys);
|
||||
return !partTagsOptHasCol(partTagsGetPartKeys(pNode));
|
||||
}
|
||||
|
||||
static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSubplan) {
|
||||
SPartitionLogicNode* pPart =
|
||||
(SPartitionLogicNode*)optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
|
||||
if (NULL == pPart) {
|
||||
SLogicNode* pNode = optFindPossibleNode(pLogicSubplan->pNode, partTagsOptMayBeOptimized);
|
||||
if (NULL == pNode) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pPart->node.pChildren, 0);
|
||||
TSWAP(pPart->pPartitionKeys, pScan->pPartTags);
|
||||
int32_t code = replaceLogicNode(pLogicSubplan, (SLogicNode*)pPart, (SLogicNode*)pScan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NODES_CLEAR_LIST(pPart->node.pChildren);
|
||||
nodesDestroyNode((SNode*)pPart);
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
SScanLogicNode* pScan = (SScanLogicNode*)nodesListGetNode(pNode->pChildren, 0);
|
||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pPartTags);
|
||||
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
|
||||
if (TSDB_CODE_SUCCESS == code) {
|
||||
NODES_CLEAR_LIST(pNode->pChildren);
|
||||
nodesDestroyNode((SNode*)pNode);
|
||||
}
|
||||
} else {
|
||||
TSWAP(((SAggLogicNode*)pNode)->pGroupKeys, pScan->pPartTags);
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
@ -1088,7 +1107,7 @@ static const SOptimizeRule optimizeRuleSet[] = {
|
|||
{.pName = "ConditionPushDown", .optimizeFunc = cpdOptimize},
|
||||
{.pName = "OrderByPrimaryKey", .optimizeFunc = opkOptimize},
|
||||
{.pName = "SmaIndex", .optimizeFunc = smaOptimize},
|
||||
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize}
|
||||
{.pName = "PartitionByTags", .optimizeFunc = partTagsOptimize},
|
||||
};
|
||||
// clang-format on
|
||||
|
||||
|
|
|
@ -45,10 +45,17 @@ TEST_F(PlanOptimizeTest, ConditionPushDown) {
|
|||
TEST_F(PlanOptimizeTest, orderByPrimaryKey) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT * FROM t1 ORDER BY ts");
|
||||
run("SELECT * FROM t1 ORDER BY ts DESC");
|
||||
run("SELECT c1 FROM t1 ORDER BY ts");
|
||||
|
||||
run("SELECT c1 FROM t1 ORDER BY ts DESC");
|
||||
|
||||
run("SELECT COUNT(*) FROM t1 INTERVAL(10S) ORDER BY _WSTARTTS DESC");
|
||||
}
|
||||
|
||||
TEST_F(PlanOptimizeTest, PartitionTags) {
|
||||
useDb("root", "test");
|
||||
|
||||
run("SELECT c1 FROM st1 PARTITION BY tag1");
|
||||
|
||||
run("SELECT SUM(c1) FROM st1 GROUP BY tag1");
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue