[td-10564] Add test case and fix bug in generated log query plan.
This commit is contained in:
parent
94b2e2ddc1
commit
6fedb60ec2
|
@ -107,6 +107,15 @@ typedef struct SExprInfo {
|
|||
struct tExprNode *pExpr;
|
||||
} SExprInfo;
|
||||
|
||||
typedef struct SStateWindow {
|
||||
SColumn col;
|
||||
} SStateWindow;
|
||||
|
||||
typedef struct SSessionWindow {
|
||||
int64_t gap; // gap between two session window(in microseconds)
|
||||
SColumn col;
|
||||
} SSessionWindow;
|
||||
|
||||
#define QUERY_ASC_FORWARD_STEP 1
|
||||
#define QUERY_DESC_FORWARD_STEP -1
|
||||
|
||||
|
|
|
@ -491,15 +491,6 @@ typedef struct SInterval {
|
|||
int64_t offset;
|
||||
} SInterval;
|
||||
|
||||
typedef struct SSessionWindow {
|
||||
int64_t gap; // gap between two session window(in microseconds)
|
||||
int32_t primaryColId; // primary timestamp column
|
||||
} SSessionWindow;
|
||||
|
||||
typedef struct SStateWindow {
|
||||
int32_t columnId;
|
||||
} SStateWindow;
|
||||
|
||||
typedef struct {
|
||||
SMsgHead head;
|
||||
char version[TSDB_VERSION_LEN];
|
||||
|
@ -524,7 +515,7 @@ typedef struct {
|
|||
int16_t orderColId;
|
||||
int16_t numOfCols; // the number of columns will be load from vnode
|
||||
SInterval interval;
|
||||
SSessionWindow sw; // session window
|
||||
// SSessionWindow sw; // session window
|
||||
uint16_t tagCondLen; // tag length in current query
|
||||
uint16_t colCondLen; // column length in current query
|
||||
int16_t numOfGroupCols; // num of group by columns
|
||||
|
|
|
@ -48,10 +48,10 @@ bool qIsValidUdf(SArray* pUdfInfo, const char* name, int32_t len, int32_t* funct
|
|||
|
||||
bool qIsAggregateFunction(const char* functionName) {
|
||||
assert(functionName != NULL);
|
||||
bool scalefunc = false;
|
||||
qIsBuiltinFunction(functionName, strlen(functionName), &scalefunc);
|
||||
bool scalarfunc = false;
|
||||
qIsBuiltinFunction(functionName, strlen(functionName), &scalarfunc);
|
||||
|
||||
return !scalefunc;
|
||||
return !scalarfunc;
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -763,6 +763,7 @@ int32_t validateSessionNode(SQueryStmtInfo *pQueryInfo, SSessionWindowVal* pSess
|
|||
const char* msg2 = "only one type time window allowed";
|
||||
const char* msg3 = "invalid column name";
|
||||
const char* msg4 = "invalid time window";
|
||||
const char* msg5 = "only the primary time stamp column can be used in session window";
|
||||
|
||||
// no session window
|
||||
if (!TPARSER_HAS_TOKEN(pSession->gap)) {
|
||||
|
@ -795,18 +796,22 @@ int32_t validateSessionNode(SQueryStmtInfo *pQueryInfo, SSessionWindowVal* pSess
|
|||
}
|
||||
|
||||
if (index.columnIndex != PRIMARYKEY_TIMESTAMP_COL_ID) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg3);
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg5);
|
||||
}
|
||||
|
||||
pQueryInfo->sessionWindow.primaryColId = PRIMARYKEY_TIMESTAMP_COL_ID;
|
||||
STableMetaInfo* pTableMetaInfo = getMetaInfo(pQueryInfo, index.tableIndex);
|
||||
STableMeta* pTableMeta = pTableMetaInfo->pTableMeta;
|
||||
|
||||
SSchema* pSchema = getOneColumnSchema(pTableMeta, index.columnIndex);
|
||||
pQueryInfo->sessionWindow.col = createColumn(pTableMetaInfo->pTableMeta->uid, pTableMetaInfo->aliasName, index.type, pSchema);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
// parse the window_state
|
||||
int32_t validateStateWindowNode(SQueryStmtInfo *pQueryInfo, SWindowStateVal* pWindowState, SMsgBuf* pMsgBuf) {
|
||||
const char* msg1 = "invalid column name";
|
||||
const char* msg2 = "invalid column type";
|
||||
const char* msg3 = "not support state_window with group by ";
|
||||
const char* msg2 = "invalid column type to create state window";
|
||||
const char* msg3 = "not support state_window with group by";
|
||||
const char* msg4 = "function not support for super table query";
|
||||
const char* msg5 = "not support state_window on tag column";
|
||||
|
||||
|
@ -836,22 +841,15 @@ int32_t validateStateWindowNode(SQueryStmtInfo *pQueryInfo, SWindowStateVal* pWi
|
|||
return buildInvalidOperationMsg(pMsgBuf, msg5);
|
||||
}
|
||||
|
||||
if (pGroupExpr->columnInfo == NULL) {
|
||||
pGroupExpr->columnInfo = taosArrayInit(4, sizeof(SColIndex));
|
||||
}
|
||||
|
||||
SSchema* pSchema = getOneColumnSchema(pTableMeta, index.columnIndex);
|
||||
if (pSchema->type == TSDB_DATA_TYPE_TIMESTAMP || IS_FLOAT_TYPE(pSchema->type)) {
|
||||
return buildInvalidOperationMsg(pMsgBuf, msg2);
|
||||
}
|
||||
|
||||
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, TSDB_COL_NORMAL);
|
||||
SColIndex colIndex = { .colIndex = index.columnIndex, .flag = TSDB_COL_NORMAL, .colId = pSchema->colId };
|
||||
|
||||
//TODO use group by routine? state window query not support stable query.
|
||||
taosArrayPush(pGroupExpr->columnInfo, &colIndex);
|
||||
pQueryInfo->stateWindow.col = createColumn(pTableMeta->uid, pTableMetaInfo->aliasName, index.type, pSchema);
|
||||
pQueryInfo->info.stateWindow = true;
|
||||
|
||||
columnListInsert(pQueryInfo->colList, pTableMeta->uid, pSchema, index.type);
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -3047,10 +3045,10 @@ int32_t validateSqlExpr(const tSqlExpr* pSqlExpr, SQueryStmtInfo *pQueryInfo, SM
|
|||
if (pLeft->type == SQL_NODE_SQLFUNCTION && pRight->type == SQL_NODE_SQLFUNCTION) {
|
||||
|
||||
char token[FUNCTIONS_NAME_MAX_LENGTH] = {0};
|
||||
tstrncpy(token, pLeft->Expr.operand.z, pLeft->Expr.operand.n);
|
||||
strncpy(token, pLeft->Expr.operand.z, pLeft->Expr.operand.n);
|
||||
bool agg1 = qIsAggregateFunction(token);
|
||||
|
||||
tstrncpy(token, pRight->Expr.operand.z, pRight->Expr.operand.n);
|
||||
strncpy(token, pRight->Expr.operand.z, pRight->Expr.operand.n);
|
||||
bool agg2 = qIsAggregateFunction(token);
|
||||
|
||||
if (agg1 != agg2) {
|
||||
|
|
|
@ -167,7 +167,7 @@ TEST(testCase, planner_test) {
|
|||
TEST(testCase, displayPlan) {
|
||||
// generateLogicplan("select count(*) from `t.1abc`");
|
||||
// generateLogicplan("select count(*)+ 22 from `t.1abc`");
|
||||
// generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h)");
|
||||
// generateLogicplan("select count(*)+ 22 from `t.1abc` interval(1h, 20s) sliding(10m) limit 20,30");
|
||||
// generateLogicplan("select count(*) from `t.1abc` group by a");
|
||||
// generateLogicplan("select count(A+B) from `t.1abc` group by a");
|
||||
// generateLogicplan("select count(length(a)+b) from `t.1abc` group by a");
|
||||
|
@ -175,7 +175,9 @@ TEST(testCase, displayPlan) {
|
|||
// generateLogicplan("select count(*),sum(a),avg(b),min(a+b)+99 from `t.1abc`");
|
||||
// generateLogicplan("select count(*), min(a) + 99 from `t.1abc`");
|
||||
// generateLogicplan("select count(length(count(*) + 22)) from `t.1abc`");
|
||||
generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc`");
|
||||
// generateLogicplan("select concat(concat(a,b), concat(a,b)) from `t.1abc` limit 20");
|
||||
// generateLogicplan("select count(*), first(a), last(b) from `t.1abc` state_window(a)");
|
||||
generateLogicplan("select count(*), first(a), last(b) from `t.1abc` session(ts, 20s)");
|
||||
|
||||
// order by + group by column + limit offset + fill
|
||||
|
||||
|
|
|
@ -95,8 +95,7 @@ int32_t qCreateQueryJob(const struct SQueryDistPlanNode* pPhyNode, struct SQuery
|
|||
//======================================================================================================================
|
||||
|
||||
static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPlanNode** prev, int32_t numOfPrev,
|
||||
SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo,
|
||||
void* pExtInfo) {
|
||||
SExprInfo** pExpr, int32_t numOfOutput, SQueryTableInfo* pTableInfo, void* pExtInfo) {
|
||||
SQueryPlanNode* pNode = calloc(1, sizeof(SQueryPlanNode));
|
||||
|
||||
pNode->info.type = type;
|
||||
|
@ -134,6 +133,20 @@ static SQueryPlanNode* createQueryNode(int32_t type, const char* name, SQueryPla
|
|||
break;
|
||||
}
|
||||
|
||||
case QNODE_STATEWINDOW: {
|
||||
SColumn* psw = calloc(1, sizeof(SColumn));
|
||||
pNode->pExtInfo = psw;
|
||||
memcpy(psw, pExtInfo, sizeof(SColumn));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SESSIONWINDOW: {
|
||||
SSessionWindow *pSessionWindow = calloc(1, sizeof(SSessionWindow));
|
||||
pNode->pExtInfo = pSessionWindow;
|
||||
memcpy(pSessionWindow, pExtInfo, sizeof(struct SSessionWindow));
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_GROUPBY: {
|
||||
SGroupbyExpr* p = (SGroupbyExpr*) pExtInfo;
|
||||
|
||||
|
@ -262,9 +275,9 @@ static SQueryPlanNode* doCreateQueryPlanForSingleTableImpl(SQueryStmtInfo* pQuer
|
|||
if (pQueryInfo->interval.interval > 0) {
|
||||
pNode = createQueryNode(QNODE_TIMEWINDOW, "TimeWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->interval);
|
||||
} else if (pQueryInfo->sessionWindow.gap > 0) {
|
||||
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, NULL, 0, info, NULL);
|
||||
} else if (pQueryInfo->stateWindow.columnId > 0) {
|
||||
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, NULL, 0, info, NULL);
|
||||
pNode = createQueryNode(QNODE_SESSIONWINDOW, "SessionWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->sessionWindow);
|
||||
} else if (pQueryInfo->stateWindow.col.info.colId > 0) {
|
||||
pNode = createQueryNode(QNODE_STATEWINDOW, "StateWindowAgg", &pNode, 1, p->pData, num, info, &pQueryInfo->stateWindow);
|
||||
} else if (numOfGroupCols != 0 && !pQueryInfo->groupbyExpr.groupbyTag) {
|
||||
pNode = createQueryNode(QNODE_GROUPBY, "Groupby", &pNode, 1, p->pData, num, info, &pQueryInfo->groupbyExpr);
|
||||
} else {
|
||||
|
@ -343,8 +356,8 @@ static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo) {
|
|||
for (int32_t j = 0; j < taosArrayGetSize(p); ++j) {
|
||||
SExprInfo* pExpr = taosArrayGetP(p, j);
|
||||
|
||||
bool canPushDown = true;
|
||||
if (pExpr->pExpr->nodeType == TEXPR_FUNCTION_NODE && qIsAggregateFunction(pExpr->pExpr->_function.functionName)) {
|
||||
bool canPushDown = true;
|
||||
for (int32_t k = 0; k < taosArrayGetSize(pNext); ++k) {
|
||||
SExprInfo* pNextLevelExpr = taosArrayGetP(pNext, k);
|
||||
if (pExpr->base.pColumns->info.colId == pNextLevelExpr->base.resSchema.colId) {
|
||||
|
@ -353,14 +366,14 @@ static void exprInfoPushDown(SQueryStmtInfo* pQueryInfo) {
|
|||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (canPushDown) {
|
||||
taosArrayInsert(pNext, j, &pExpr);
|
||||
taosArrayRemove(p, j);
|
||||
if (canPushDown) {
|
||||
taosArrayInsert(pNext, j, &pExpr);
|
||||
taosArrayRemove(p, j);
|
||||
|
||||
// add the project function in level of "i"
|
||||
|
||||
// todo add the project function in level of "i"
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -390,7 +403,7 @@ SArray* createQueryPlanImpl(SQueryStmtInfo* pQueryInfo) {
|
|||
uint64_t uid = pTableMetaInfo->pTableMeta->uid;
|
||||
|
||||
SArray* exprList = taosArrayInit(4, POINTER_BYTES);
|
||||
if (copyExprInfoList(exprList, pQueryInfo->exprList, uid, true) != 0) {
|
||||
if (copyExprInfoList(exprList, pQueryInfo->exprList[0], uid, true) != 0) {
|
||||
terrno = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
// dropAllExprInfo(exprList);
|
||||
exit(-1);
|
||||
|
@ -558,6 +571,46 @@ static int32_t doPrintPlan(char* buf, SQueryPlanNode* pQueryNode, int32_t level,
|
|||
break;
|
||||
}
|
||||
|
||||
case QNODE_STATEWINDOW: {
|
||||
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
|
||||
SSqlExpr* pExpr = &pExprInfo->base;
|
||||
len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
|
||||
if (i < pQueryNode->numOfExpr - 1) {
|
||||
len1 = sprintf(buf + len,", ");
|
||||
len += len1;
|
||||
}
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len,") ");
|
||||
len += len1;
|
||||
|
||||
SColumn* pCol = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "col:%s #%d\n", pCol->name, pCol->info.colId);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_SESSIONWINDOW: {
|
||||
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
|
||||
SSqlExpr* pExpr = &pExprInfo->base;
|
||||
len += sprintf(buf + len,"%s [%s #%d]", pExpr->token, pExpr->resSchema.name, pExpr->resSchema.colId);
|
||||
if (i < pQueryNode->numOfExpr - 1) {
|
||||
len1 = sprintf(buf + len,", ");
|
||||
len += len1;
|
||||
}
|
||||
}
|
||||
|
||||
len1 = sprintf(buf + len,") ");
|
||||
len += len1;
|
||||
|
||||
struct SSessionWindow* ps = pQueryNode->pExtInfo;
|
||||
len1 = sprintf(buf + len, "col:[%s #%d], gap:%"PRId64" (ms) \n", ps->col.name, ps->col.info.colId, ps->gap);
|
||||
len += len1;
|
||||
break;
|
||||
}
|
||||
|
||||
case QNODE_GROUPBY: { // todo hide the invisible column
|
||||
for(int32_t i = 0; i < pQueryNode->numOfExpr; ++i) {
|
||||
SExprInfo* pExprInfo = taosArrayGetP(pQueryNode->pExpr, i);
|
||||
|
|
Loading…
Reference in New Issue