Merge pull request #16015 from taosdata/fix/3.0_wxy
fix: improve create stream check
This commit is contained in:
commit
2040ab7872
|
@ -269,6 +269,7 @@ typedef struct SSelectStmt {
|
||||||
bool hasInterpFunc;
|
bool hasInterpFunc;
|
||||||
bool hasLastRowFunc;
|
bool hasLastRowFunc;
|
||||||
bool hasTimeLineFunc;
|
bool hasTimeLineFunc;
|
||||||
|
bool hasUdaf;
|
||||||
bool onlyHasKeepOrderFunc;
|
bool onlyHasKeepOrderFunc;
|
||||||
bool groupSort;
|
bool groupSort;
|
||||||
} SSelectStmt;
|
} SSelectStmt;
|
||||||
|
|
|
@ -346,7 +346,7 @@ static const SSysTableMeta perfsMeta[] = {
|
||||||
{TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)},
|
{TSDB_PERFS_TABLE_TOPICS, topicSchema, tListLen(topicSchema)},
|
||||||
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
{TSDB_PERFS_TABLE_CONSUMERS, consumerSchema, tListLen(consumerSchema)},
|
||||||
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
{TSDB_PERFS_TABLE_SUBSCRIPTIONS, subscriptionSchema, tListLen(subscriptionSchema)},
|
||||||
{TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
|
// {TSDB_PERFS_TABLE_OFFSETS, offsetSchema, tListLen(offsetSchema)},
|
||||||
{TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)},
|
{TSDB_PERFS_TABLE_TRANS, transSchema, tListLen(transSchema)},
|
||||||
{TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
|
{TSDB_PERFS_TABLE_SMAS, smaSchema, tListLen(smaSchema)},
|
||||||
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
{TSDB_PERFS_TABLE_STREAMS, streamSchema, tListLen(streamSchema)},
|
||||||
|
|
|
@ -1350,6 +1350,7 @@ static void setFuncClassification(SNode* pCurrStmt, SFunctionNode* pFunc) {
|
||||||
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
|
pSelect->hasInterpFunc = pSelect->hasInterpFunc ? true : (FUNCTION_TYPE_INTERP == pFunc->funcType);
|
||||||
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
|
pSelect->hasLastRowFunc = pSelect->hasLastRowFunc ? true : (FUNCTION_TYPE_LAST_ROW == pFunc->funcType);
|
||||||
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
|
pSelect->hasTimeLineFunc = pSelect->hasTimeLineFunc ? true : fmIsTimelineFunc(pFunc->funcId);
|
||||||
|
pSelect->hasUdaf = pSelect->hasUdaf ? true : fmIsUserDefinedFunc(pFunc->funcId) && fmIsAggFunc(pFunc->funcId);
|
||||||
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false;
|
pSelect->onlyHasKeepOrderFunc = pSelect->onlyHasKeepOrderFunc ? fmIsKeepOrderFunc(pFunc->funcId) : false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2644,6 +2645,11 @@ static int32_t translateInterp(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (NULL == pSelect->pRange || NULL == pSelect->pEvery || NULL == pSelect->pFill) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_INTERP_CLAUSE,
|
||||||
|
"Missing RANGE clause, EVERY clause or FILL clause");
|
||||||
|
}
|
||||||
|
|
||||||
int32_t code = translateExpr(pCxt, &pSelect->pRange);
|
int32_t code = translateExpr(pCxt, &pSelect->pRange);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateExpr(pCxt, &pSelect->pEvery);
|
code = translateExpr(pCxt, &pSelect->pEvery);
|
||||||
|
@ -4734,6 +4740,11 @@ static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
|
||||||
!isPartitionByTbname(pSelect->pPartitionByList);
|
!isPartitionByTbname(pSelect->pPartitionByList);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool crossTableWithUdaf(SSelectStmt* pSelect) {
|
||||||
|
return pSelect->hasUdaf && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
|
||||||
|
!isPartitionByTbname(pSelect->pPartitionByList);
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
|
||||||
if (NULL != pStmt->pOptions->pWatermark &&
|
if (NULL != pStmt->pOptions->pWatermark &&
|
||||||
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
|
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
|
||||||
|
@ -4785,7 +4796,8 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
||||||
|
|
||||||
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
||||||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) {
|
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
|
||||||
|
crossTableWithUdaf(pSelect)) {
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
}
|
}
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
|
@ -294,16 +294,6 @@ TEST_F(ParserSelectTest, intervalSemanticCheck) {
|
||||||
TEST_F(ParserSelectTest, interp) {
|
TEST_F(ParserSelectTest, interp) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1");
|
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00')");
|
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') FILL(LINEAR)");
|
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1 EVERY(5s)");
|
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s)");
|
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
|
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1615,6 +1615,9 @@ static int32_t partTagsOptimize(SOptimizeContext* pCxt, SLogicSubplan* pLogicSub
|
||||||
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
if (QUERY_NODE_LOGIC_PLAN_PARTITION == nodeType(pNode)) {
|
||||||
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags);
|
TSWAP(((SPartitionLogicNode*)pNode)->pPartitionKeys, pScan->pGroupTags);
|
||||||
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
|
int32_t code = replaceLogicNode(pLogicSubplan, pNode, (SLogicNode*)pScan);
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = adjustLogicNodeDataRequirement((SLogicNode*)pScan, pNode->resultDataOrder);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
NODES_CLEAR_LIST(pNode->pChildren);
|
NODES_CLEAR_LIST(pNode->pChildren);
|
||||||
nodesDestroyNode((SNode*)pNode);
|
nodesDestroyNode((SNode*)pNode);
|
||||||
|
|
|
@ -93,8 +93,6 @@ TEST_F(PlanBasicTest, tailFunc) {
|
||||||
TEST_F(PlanBasicTest, interpFunc) {
|
TEST_F(PlanBasicTest, interpFunc) {
|
||||||
useDb("root", "test");
|
useDb("root", "test");
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1");
|
|
||||||
|
|
||||||
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
|
run("SELECT INTERP(c1) FROM t1 RANGE('2017-7-14 18:00:00', '2017-7-14 19:00:00') EVERY(5s) FILL(LINEAR)");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue