[td-10564] Fix bug in create logic plan.
This commit is contained in:
parent
f0b71c971a
commit
7c9d760026
|
@ -495,6 +495,10 @@ typedef struct SSessionWindow {
|
|||
int32_t primaryColId; // primary timestamp column
|
||||
} SSessionWindow;
|
||||
|
||||
typedef struct SStateWindow {
|
||||
int32_t columnId;
|
||||
} SStateWindow;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char version[TSDB_VERSION_LEN];
|
||||
|
|
|
@ -259,7 +259,7 @@ int32_t qIsBuiltinFunction(const char* name, int32_t len, bool* scalarFunction);
|
|||
|
||||
bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* functionId);
|
||||
|
||||
const char* qGetFunctionName(int32_t functionId);
|
||||
bool qIsAggregateFunction(const char* functionName);
|
||||
|
||||
tExprNode* exprTreeFromBinary(const void* data, size_t size);
|
||||
|
||||
|
|
|
@ -77,6 +77,7 @@ typedef struct SQueryStmtInfo {
|
|||
STimeWindow window; // the whole query time window
|
||||
SInterval interval; // tumble time window
|
||||
SSessionWindow sessionWindow; // session time window
|
||||
SStateWindow stateWindow; // state window query
|
||||
SGroupbyExpr groupbyExpr; // groupby tags info
|
||||
SArray * colList; // SArray<SColumn*>
|
||||
SFieldInfo fieldsInfo;
|
||||
|
@ -180,6 +181,7 @@ STableMetaInfo* getMetaInfo(SQueryStmtInfo* pQueryInfo, int32_t tableIndex);
|
|||
SSchema *getOneColumnSchema(const STableMeta* pTableMeta, int32_t colIndex);
|
||||
|
||||
int32_t getNewResColId();
|
||||
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -46,10 +46,15 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct
|
|||
return true;
|
||||
}
|
||||
|
||||
const char* qGetFunctionName(int32_t functionId) {
|
||||
bool qIsAggregateFunction(const char* functionName) {
|
||||
assert(functionName != NULL);
|
||||
bool scalefunc = false;
|
||||
qIsBuiltinFunction(functionName, strlen(functionName), &scalefunc);
|
||||
|
||||
return !scalefunc;
|
||||
}
|
||||
|
||||
|
||||
SAggFunctionInfo* qGetFunctionInfo(const char* name, int32_t len) {
|
||||
pthread_once(&functionHashTableInit, doInitFunctionHashTable);
|
||||
|
||||
|
|
|
@ -41,8 +41,6 @@ SSchema createSchema(uint8_t type, int16_t bytes, int16_t colId, const char* nam
|
|||
void setColumn(SColumn* pColumn, uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
|
||||
SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSchema* pSchema);
|
||||
|
||||
SSourceParam addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn);
|
||||
|
||||
SInternalField* insertFieldInfo(SFieldInfo* pFieldInfo, int32_t index, SSchema* field);
|
||||
int32_t getNumOfFields(SFieldInfo* pFieldInfo);
|
||||
SInternalField* getInternalField(SFieldInfo* pFieldInfo, int32_t index);
|
||||
|
|
|
@ -544,7 +544,7 @@ SColumn createColumn(uint64_t uid, const char* tableName, int8_t flag, const SSc
|
|||
return c;
|
||||
}
|
||||
|
||||
SSourceParam addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn) {
|
||||
void addIntoSourceParam(SSourceParam* pSourceParam, tExprNode* pNode, SColumn* pColumn) {
|
||||
assert(pSourceParam != NULL);
|
||||
pSourceParam->num += 1;
|
||||
|
||||
|
|
|
@ -13,6 +13,7 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include <function.h>
|
||||
#include <gtest/gtest.h>
|
||||
#include <iostream>
|
||||
#pragma GCC diagnostic ignored "-Wwrite-strings"
|
||||
|
@ -66,65 +67,60 @@ void setTableMetaInfo(SQueryStmtInfo* pQueryInfo, SMetaReq *req) {
|
|||
}
|
||||
}
|
||||
|
||||
//TEST(testCase, planner_test) {
|
||||
// SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
|
||||
// ASSERT_EQ(info1.valid, true);
|
||||
//
|
||||
// char msg[128] = {0};
|
||||
// SMsgBuf buf;
|
||||
// buf.len = 128;
|
||||
// buf.buf = msg;
|
||||
//
|
||||
// SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
|
||||
// int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
// ASSERT_EQ(code, 0);
|
||||
//
|
||||
// SMetaReq req = {0};
|
||||
// int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
// ASSERT_EQ(ret, 0);
|
||||
// ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
//
|
||||
// SQueryStmtInfo* pQueryInfo = createQueryInfo();
|
||||
// setTableMetaInfo(pQueryInfo, &req);
|
||||
//
|
||||
// SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
|
||||
// ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
|
||||
// ASSERT_EQ(ret, 0);
|
||||
//
|
||||
// SArray* pExprList = pQueryInfo->exprList[0];
|
||||
// ASSERT_EQ(taosArrayGetSize(pExprList), 2);
|
||||
//
|
||||
// SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
|
||||
// ASSERT_EQ(p1->base.pColumns->uid, 110);
|
||||
// ASSERT_EQ(p1->base.numOfParams, 1);
|
||||
// ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
|
||||
// ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
|
||||
// ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_NORMAL);
|
||||
// ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
|
||||
// ASSERT_EQ(p1->base.interBytes, 16);
|
||||
//
|
||||
// ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
|
||||
// ASSERT_EQ(p1->pExpr->_function.functionId, FUNCTION_TOP);
|
||||
// ASSERT_TRUE(p1->pExpr->_node.pRight == NULL);
|
||||
//
|
||||
// tExprNode* pParam = p1->pExpr->_node.pLeft;
|
||||
//
|
||||
// ASSERT_EQ(pParam->nodeType, TEXPR_BINARYEXPR_NODE);
|
||||
// ASSERT_EQ(pParam->_node.optr, TSDB_BINARY_OP_DIVIDE);
|
||||
// ASSERT_EQ(pParam->_node.pLeft->nodeType, TEXPR_BINARYEXPR_NODE);
|
||||
// ASSERT_EQ(pParam->_node.pRight->nodeType, TEXPR_VALUE_NODE);
|
||||
//
|
||||
// ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
|
||||
// ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
|
||||
//
|
||||
// struct SQueryPlanNode* n = nullptr;
|
||||
// code = qCreateQueryPlan(pQueryInfo, &n);
|
||||
//
|
||||
// char* str = NULL;
|
||||
// qQueryPlanToString(n, &str);
|
||||
// printf("%s\n", str);
|
||||
//
|
||||
// destroyQueryInfo(pQueryInfo);
|
||||
// qParserClearupMetaRequestInfo(&req);
|
||||
// destroySqlInfo(&info1);
|
||||
//}
|
||||
TEST(testCase, planner_test) {
|
||||
SSqlInfo info1 = doGenerateAST("select top(a*b / 99, 20) from `t.1abc` interval(10s, 1s)");
|
||||
ASSERT_EQ(info1.valid, true);
|
||||
|
||||
char msg[128] = {0};
|
||||
SMsgBuf buf;
|
||||
buf.len = 128;
|
||||
buf.buf = msg;
|
||||
|
||||
SSqlNode* pNode = (SSqlNode*) taosArrayGetP(((SArray*)info1.list), 0);
|
||||
int32_t code = evaluateSqlNode(pNode, TSDB_TIME_PRECISION_NANO, &buf);
|
||||
ASSERT_EQ(code, 0);
|
||||
|
||||
SMetaReq req = {0};
|
||||
int32_t ret = qParserExtractRequestedMetaInfo(&info1, &req, msg, 128);
|
||||
ASSERT_EQ(ret, 0);
|
||||
ASSERT_EQ(taosArrayGetSize(req.pTableName), 1);
|
||||
|
||||
SQueryStmtInfo* pQueryInfo = createQueryInfo();
|
||||
setTableMetaInfo(pQueryInfo, &req);
|
||||
|
||||
SSqlNode* pSqlNode = (SSqlNode*)taosArrayGetP(info1.list, 0);
|
||||
ret = validateSqlNode(pSqlNode, pQueryInfo, &buf);
|
||||
ASSERT_EQ(ret, 0);
|
||||
|
||||
SArray* pExprList = pQueryInfo->exprList[0];
|
||||
ASSERT_EQ(taosArrayGetSize(pExprList), 2);
|
||||
|
||||
SExprInfo* p1 = (SExprInfo*) taosArrayGetP(pExprList, 1);
|
||||
ASSERT_EQ(p1->base.pColumns->uid, 110);
|
||||
ASSERT_EQ(p1->base.numOfParams, 1);
|
||||
ASSERT_EQ(p1->base.resSchema.type, TSDB_DATA_TYPE_DOUBLE);
|
||||
ASSERT_STRCASEEQ(p1->base.resSchema.name, "top(a*b / 99, 20)");
|
||||
ASSERT_EQ(p1->base.pColumns->flag, TSDB_COL_TMP);
|
||||
ASSERT_STRCASEEQ(p1->base.token, "top(a*b / 99, 20)");
|
||||
ASSERT_EQ(p1->base.interBytes, 16);
|
||||
|
||||
ASSERT_EQ(p1->pExpr->nodeType, TEXPR_FUNCTION_NODE);
|
||||
ASSERT_STREQ(p1->pExpr->_function.functionName, "top");
|
||||
|
||||
tExprNode* pParam = p1->pExpr->_function.pChild[0];
|
||||
|
||||
ASSERT_EQ(pParam->nodeType, TEXPR_COL_NODE);
|
||||
ASSERT_EQ(taosArrayGetSize(pQueryInfo->colList), 3);
|
||||
ASSERT_EQ(pQueryInfo->fieldsInfo.numOfOutput, 2);
|
||||
|
||||
struct SQueryPlanNode* n = nullptr;
|
||||
code = qCreateQueryPlan(pQueryInfo, &n);
|
||||
|
||||
char* str = NULL;
|
||||
qQueryPlanToString(n, &str);
|
||||
printf("%s\n", str);
|
||||
|
||||
destroyQueryInfo(pQueryInfo);
|
||||
qParserClearupMetaRequestInfo(&req);
|
||||
destroySqlInfo(&info1);
|
||||
}
|
|
@ -13,10 +13,10 @@
|
|||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#include "os.h"
|
||||
#include "plannerInt.h"
|
||||
#include "parser.h"
|
||||
#include "function.h"
|
||||
#include "os.h"
|
||||
#include "parser.h"
|
||||
#include "plannerInt.h"
|
||||
|
||||
#define QNODE_TAGSCAN 1
|
||||
#define QNODE_TABLESCAN 2
|
||||
|
@ -30,7 +30,8 @@
|
|||
#define QNODE_UNIONALL 10
|
||||
#define QNODE_TIMEWINDOW 11
|
||||
#define QNODE_SESSIONWINDOW 12
|
||||
#define QNODE_FILL 13
|
||||
#define QNODE_STATEWINDOW 13
|
||||
#define QNODE_FILL 14
|
||||
|
||||
typedef struct SFillEssInfo {
|
||||
int32_t fillType; // fill type
|
||||
|
@ -178,20 +179,19 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
|
|||
int32_t numOfOutput = (int32_t) taosArrayGetSize(pExprs);
|
||||
pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pExprs->pData, numOfOutput, info, NULL);
|
||||
} else {
|
||||
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
|
||||
|
||||
// table source column projection, generate the projection expr
|
||||
int32_t numOfCols = (int32_t) taosArrayGetSize(tableCols);
|
||||
SExprInfo** pExpr = calloc(numOfCols, POINTER_BYTES);
|
||||
|
||||
STableMetaInfo* pTableMetaInfo1 = getMetaInfo(pQueryInfo, 0);
|
||||
|
||||
for (int32_t i = 0; i < numOfCols; ++i) {
|
||||
SColumn* pCol = taosArrayGetP(tableCols, i);
|
||||
SColumnIndex index = {.tableIndex = 0, /*.columnIndex = pCol->columnIndex*/};
|
||||
SSchema* pSchema = getOneColumnSchema(pTableMetaInfo1->pTableMeta, i);
|
||||
|
||||
SSchema* pSchema = getOneColumnSchema(pTableMetaInfo->pTableMeta, i);
|
||||
SSchema resultSchema = *pSchema;
|
||||
SSourceParam param = {0};
|
||||
addIntoSourceParam(¶m, NULL, pCol);
|
||||
|
||||
SExprInfo* p = NULL;//createExprInfo(pTableMetaInfo1, FUNCTION_PRJ, &index, NULL, &resultSchema, 0);
|
||||
SExprInfo* p = createExprInfo(pTableMetaInfo1, "project", ¶m, pSchema, 0);
|
||||
pExpr[i] = p;
|
||||
}
|
||||
|
||||
|
@ -202,33 +202,69 @@ static SQueryPlanNode* doAddTableColumnNode(SQueryStmtInfo* pQueryInfo, STableMe
|
|||
return pNode;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doCreateQueryPlanForOneTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info,
|
||||
SArray* pExprs) {
|
||||
static int32_t getFunctionLevel(SQueryStmtInfo* pQueryInfo) {
|
||||
int32_t n = 10;
|
||||
|
||||
int32_t level = 0;
|
||||
for(int32_t i = 0; i < n; ++i) {
|
||||
SArray* pList = pQueryInfo->exprList[i];
|
||||
if (taosArrayGetSize(pList) > 0) {
|
||||
level += 1;
|
||||
}
|
||||
}
|
||||
|
||||
return level;
|
||||
}
|
||||
|
||||
static SQueryPlanNode* createOneQueryPlanNode(SArray* p, SQueryPlanNode* pNode, SExprInfo* pExpr, SQueryTableInfo* info) {
|
||||
if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE) {
|
||||
bool aggregateFunc = qIsAggregateFunction(pExpr->pExpr->_function.functionName);
|
||||
if (aggregateFunc) {
|
||||
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
|
||||
return createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, p->pData, numOfOutput, info, NULL);
|
||||
} else {
|
||||
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
|
||||
return createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, numOfOutput, info, NULL);
|
||||
}
|
||||
} else {
|
||||
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
|
||||
return createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, p->pData, numOfOutput, info, NULL);
|
||||
}
|
||||
}
|
||||
|
||||
static SQueryPlanNode* doCreateQueryPlanForOneTableImpl(SQueryStmtInfo* pQueryInfo, SQueryPlanNode* pNode, SQueryTableInfo* info, SArray** pExprs) {
|
||||
// check for aggregation
|
||||
size_t numOfGroupCols = taosArrayGetSize(pQueryInfo->groupbyExpr.columnInfo);
|
||||
|
||||
if (pQueryInfo->interval.interval > 0) {
|
||||
int32_t numOfOutput = (int32_t)taosArrayGetSize(pExprs);
|
||||
int32_t level = getFunctionLevel(pQueryInfo);
|
||||
for(int32_t i = level - 1; i >= 0; --i) {
|
||||
SArray* p = pQueryInfo->exprList[i];
|
||||
SExprInfo* pExpr = (SExprInfo*)taosArrayGetP(p, 0);
|
||||
|
||||
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, pExprs->pData, numOfOutput, info, &pQueryInfo->interval);
|
||||
if (numOfGroupCols != 0) {
|
||||
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, pExprs->pData, numOfOutput, info, &pQueryInfo->groupbyExpr);
|
||||
}
|
||||
} else if (numOfGroupCols > 0) {
|
||||
int32_t numOfOutput = (int32_t)taosArrayGetSize(pExprs);
|
||||
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, pExprs->pData, numOfOutput, info,
|
||||
&pQueryInfo->groupbyExpr);
|
||||
if (i == 0) {
|
||||
if (pQueryInfo->interval.interval > 0) {
|
||||
int32_t numOfOutput = (int32_t)taosArrayGetSize(p);
|
||||
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, numOfOutput, info, &pQueryInfo->interval);
|
||||
} else if (pQueryInfo->sessionWindow.gap > 0) {
|
||||
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, NULL, 0, info, NULL);
|
||||
} else if (pQueryInfo->info.simpleAgg) {
|
||||
int32_t numOfOutput = (int32_t)taosArrayGetSize(pExprs);
|
||||
pNode = createQueryNode(QNODE_AGGREGATE, "Aggregate", &pNode, 1, pExprs->pData, numOfOutput, info, NULL);
|
||||
} else if (pQueryInfo->stateWindow.columnId > 0) {
|
||||
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, NULL, 0, info, NULL);
|
||||
} else {
|
||||
pNode = createOneQueryPlanNode(p, pNode, pExpr, info);
|
||||
}
|
||||
} else {
|
||||
pNode = createOneQueryPlanNode(p, pNode, pExpr, info);
|
||||
}
|
||||
}
|
||||
|
||||
if (pQueryInfo->havingFieldNum > 0 || pQueryInfo->info.arithmeticOnAgg) {
|
||||
if (numOfGroupCols != 0) {
|
||||
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, NULL, 0, info, &pQueryInfo->groupbyExpr);
|
||||
}
|
||||
|
||||
if (pQueryInfo->havingFieldNum > 0) {
|
||||
// int32_t numOfExpr = (int32_t)taosArrayGetSize(pQueryInfo->exprList1);
|
||||
// pNode =
|
||||
// createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info, NULL);
|
||||
// pNode = createQueryNode(QNODE_PROJECT, "Projection", &pNode, 1, pQueryInfo->exprList1->pData, numOfExpr, info,
|
||||
// NULL);
|
||||
}
|
||||
|
||||
if (pQueryInfo->fillType != TSDB_FILL_NONE) {
|
||||
|
@ -314,7 +350,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
|
|||
|
||||
// 3. add the join node here
|
||||
SQueryTableInfo info = {0};
|
||||
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList);
|
||||
int32_t num = (int32_t) taosArrayGetSize(pQueryInfo->exprList[0]);
|
||||
SQueryPlanNode* pNode = createQueryNode(QNODE_JOIN, "Join", upstream->pData, pQueryInfo->numOfTables,
|
||||
pQueryInfo->exprList[0]->pData, num, &info, NULL);
|
||||
|
||||
|
@ -324,7 +360,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
|
|||
taosArrayPush(upstream, &pNode);
|
||||
} else { // only one table, normal query process
|
||||
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
|
||||
SQueryPlanNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList, pQueryInfo->colList);
|
||||
SQueryPlanNode* pNode = doCreateQueryPlanForOneTable(pQueryInfo, pTableMetaInfo, pQueryInfo->exprList[0], pQueryInfo->colList);
|
||||
upstream = taosArrayInit(5, POINTER_BYTES);
|
||||
taosArrayPush(upstream, &pNode);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue