From d1b974228f8782c53b13962f6b63775e89c52f0a Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Mon, 25 Mar 2024 11:16:28 +0800 Subject: [PATCH] add stream check --- source/libs/parser/src/parTranslater.c | 45 ++++++++++++++++++-- tests/script/tsim/stream/udTableAndCol0.sim | 46 +++++++++++++++++++++ 2 files changed, 87 insertions(+), 4 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 14a12b8893..2163113a66 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -3981,7 +3981,12 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p } if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType && !hasPartitionByTbname(pSelect->pPartitionByList)) { - return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "State window for stream on super table must patitioned by table name"); + } + if ((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Source table of State window must not has primary key"); } return TSDB_CODE_SUCCESS; } @@ -7969,6 +7974,13 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Event window for stream on super table must patitioned by table name"); } + + if (pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW && + (SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Source table of Event window must not has primary key"); + } + if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) { @@ -8011,6 +8023,11 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Ignore expired data of Count window must be 1."); } + + if ((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Source table of Count window must not has primary key"); + } } return TSDB_CODE_SUCCESS; @@ -8499,15 +8516,35 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta static int32_t checkStreamDestTableSchema(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; - SNode* pProj = nodesListGetNode(pStmt->pCols, 0); - SColumnDefNode* pCol = (SColumnDefNode*)pProj; + SNode* pNode = nodesListGetNode(pStmt->pCols, 0); + SColumnDefNode* pCol = (SColumnDefNode*)pNode; if (pCol && pCol->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) { pCol->dataType = (SDataType){.type = TSDB_DATA_TYPE_TIMESTAMP, .precision = 0, .scale = 0, .bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes}; } - return checkTableSchemaImpl(pCxt, pStmt->pTags, pStmt->pCols, NULL); + int32_t code = checkTableSchemaImpl(pCxt, pStmt->pTags, pStmt->pCols, NULL); + if (TSDB_CODE_SUCCESS == code && NULL == pSelect->pWindow && + ((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta))) { + if (1 >= LIST_LENGTH(pStmt->pCols) || 1 >= LIST_LENGTH(pSelect->pProjectionList)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY); + } + + SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 1); + if (QUERY_NODE_COLUMN != nodeType(pProj) || + 0 != strcmp(((SColumnNode*)pProj)->colName, ((SRealTableNode*)pSelect->pFromTable)->pMeta->schema[1].name)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "Source table has primary key, result must has primary key"); + } + + pNode = nodesListGetNode(pStmt->pCols, 1); + pCol = (SColumnDefNode*)pNode; + if (!pCol->is_pk) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Source table has primary key, dest table must has primary key"); + } + } + return code; } static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) { diff --git a/tests/script/tsim/stream/udTableAndCol0.sim b/tests/script/tsim/stream/udTableAndCol0.sim index 22561dc044..9c8a5aaaad 100644 --- a/tests/script/tsim/stream/udTableAndCol0.sim +++ b/tests/script/tsim/stream/udTableAndCol0.sim @@ -60,6 +60,52 @@ sql_error create stream streams14 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE sql_error create stream streams15 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt15 tags(tc varchar(100) primary key) as select _wstart, count(*) c1, max(a) from st partition by tbname tc interval(10s); +sql create database test1 vgroups 4; +sql use test1; + + +sql create stable st(ts timestamp, a int primary key, b int, c int) tags(ta int,tb int,tc int); +sql create table t1 using st tags(1,1,1); +sql create table t2 using st tags(2,2,2); + +sql_error create stream streams16 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt16 as select _wstart, count(*) c1, max(a) from st partition by tbname tc state_window(b); +sql_error create stream streams17 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt17 as select _wstart, count(*) c1, max(a) from st partition by tbname tc event_window start with a = 0 end with a = 9; +sql_error create stream streams18 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 watermark 10s into streamt18 as select _wstart, count(*) c1, max(a) from st partition by tbname tc count_window(2); + +print ===== step2 +print ===== scalar + +sql create database test2 vgroups 4; +sql use test2; + +sql create table t1 (ts timestamp, a int, b int); + +sql create table rst(ts timestamp, a int primary key, b int) tags(ta varchar(100)); +sql create table rct1 using rst tags("aa"); + +sql create table rst6(ts timestamp, a int primary key, b int) tags(ta varchar(100)); +sql create table rst7(ts timestamp, a int primary key, b int) tags(ta varchar(100)); + +sql create stream streams19 trigger at_once ignore expired 0 ignore update 0 into streamt19 as select ts,a, b from t1; + +sql create stream streams20 trigger at_once ignore expired 0 ignore update 0 into streamt20(ts, a primary key, b) as select ts,a, b from t1; +sql create stream streams21 trigger at_once ignore expired 0 ignore update 0 into rst as select ts,a, b from t1; + +sql_error create stream streams22 trigger at_once ignore expired 0 ignore update 0 into streamt22 as select ts,1, b from rct1; + +sql_error create stream streams23 trigger at_once ignore expired 0 ignore update 0 into streamt23 as select ts, a, b from rct1; + +sql create stream streams24 trigger at_once ignore expired 0 ignore update 0 into streamt24(ts, a primary key, b) as select ts, a, b from rct1; +sql create stream streams25 trigger at_once ignore expired 0 ignore update 0 into rst6 as select ts, a, b from rct1; + +sql_error create stream streams26 trigger at_once ignore expired 0 ignore update 0 into rst7 as select ts, 1,b from rct1; + +sql_error create stream streams27 trigger at_once ignore expired 0 ignore update 0 into streamt27(ts, a primary key, b) as select ts, 1,b from rct1; + + + + + print ======over system sh/stop_dnodes.sh