add stream check

This commit is contained in:
54liuyao 2024-03-25 11:16:28 +08:00
parent a02535c3ab
commit d1b974228f
2 changed files with 87 additions and 4 deletions

View File

@ -3981,7 +3981,12 @@ static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* p
}
if (TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!hasPartitionByTbname(pSelect->pPartitionByList)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"State window for stream on super table must patitioned by table name");
}
if ((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Source table of State window must not has primary key");
}
return TSDB_CODE_SUCCESS;
}
@ -7969,6 +7974,13 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Event window for stream on super table must patitioned by table name");
}
if (pSelect->pWindow != NULL && pSelect->pWindow->type == QUERY_NODE_EVENT_WINDOW &&
(SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Source table of Event window must not has primary key");
}
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!isTimeLineQuery(pStmt->pQuery) || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList ||
crossTableWithUdaf(pSelect) || hasJsonTypeProjection(pSelect)) {
@ -8011,6 +8023,11 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Ignore expired data of Count window must be 1.");
}
if ((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Source table of Count window must not has primary key");
}
}
return TSDB_CODE_SUCCESS;
@ -8499,15 +8516,35 @@ static int32_t createLastTsSelectStmt(char* pDb, char* pTable, STableMeta* pMeta
static int32_t checkStreamDestTableSchema(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
SNode* pProj = nodesListGetNode(pStmt->pCols, 0);
SColumnDefNode* pCol = (SColumnDefNode*)pProj;
SNode* pNode = nodesListGetNode(pStmt->pCols, 0);
SColumnDefNode* pCol = (SColumnDefNode*)pNode;
if (pCol && pCol->dataType.type != TSDB_DATA_TYPE_TIMESTAMP) {
pCol->dataType = (SDataType){.type = TSDB_DATA_TYPE_TIMESTAMP,
.precision = 0,
.scale = 0,
.bytes = tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
}
return checkTableSchemaImpl(pCxt, pStmt->pTags, pStmt->pCols, NULL);
int32_t code = checkTableSchemaImpl(pCxt, pStmt->pTags, pStmt->pCols, NULL);
if (TSDB_CODE_SUCCESS == code && NULL == pSelect->pWindow &&
((SRealTableNode*)pSelect->pFromTable && hasPkInTable(((SRealTableNode*)pSelect->pFromTable)->pMeta))) {
if (1 >= LIST_LENGTH(pStmt->pCols) || 1 >= LIST_LENGTH(pSelect->pProjectionList)) {
return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY);
}
SNode* pProj = nodesListGetNode(pSelect->pProjectionList, 1);
if (QUERY_NODE_COLUMN != nodeType(pProj) ||
0 != strcmp(((SColumnNode*)pProj)->colName, ((SRealTableNode*)pSelect->pFromTable)->pMeta->schema[1].name)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY,
"Source table has primary key, result must has primary key");
}
pNode = nodesListGetNode(pStmt->pCols, 1);
pCol = (SColumnDefNode*)pNode;
if (!pCol->is_pk) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Source table has primary key, dest table must has primary key");
}
}
return code;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt, SCMCreateStreamReq* pReq) {

View File

@ -60,6 +60,52 @@ sql_error create stream streams14 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE
sql_error create stream streams15 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt15 tags(tc varchar(100) primary key) as select _wstart, count(*) c1, max(a) from st partition by tbname tc interval(10s);
sql create database test1 vgroups 4;
sql use test1;
sql create stable st(ts timestamp, a int primary key, b int, c int) tags(ta int,tb int,tc int);
sql create table t1 using st tags(1,1,1);
sql create table t2 using st tags(2,2,2);
sql_error create stream streams16 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt16 as select _wstart, count(*) c1, max(a) from st partition by tbname tc state_window(b);
sql_error create stream streams17 trigger at_once IGNORE EXPIRED 0 IGNORE UPDATE 0 into streamt17 as select _wstart, count(*) c1, max(a) from st partition by tbname tc event_window start with a = 0 end with a = 9;
sql_error create stream streams18 trigger at_once IGNORE EXPIRED 1 IGNORE UPDATE 0 watermark 10s into streamt18 as select _wstart, count(*) c1, max(a) from st partition by tbname tc count_window(2);
print ===== step2
print ===== scalar
sql create database test2 vgroups 4;
sql use test2;
sql create table t1 (ts timestamp, a int, b int);
sql create table rst(ts timestamp, a int primary key, b int) tags(ta varchar(100));
sql create table rct1 using rst tags("aa");
sql create table rst6(ts timestamp, a int primary key, b int) tags(ta varchar(100));
sql create table rst7(ts timestamp, a int primary key, b int) tags(ta varchar(100));
sql create stream streams19 trigger at_once ignore expired 0 ignore update 0 into streamt19 as select ts,a, b from t1;
sql create stream streams20 trigger at_once ignore expired 0 ignore update 0 into streamt20(ts, a primary key, b) as select ts,a, b from t1;
sql create stream streams21 trigger at_once ignore expired 0 ignore update 0 into rst as select ts,a, b from t1;
sql_error create stream streams22 trigger at_once ignore expired 0 ignore update 0 into streamt22 as select ts,1, b from rct1;
sql_error create stream streams23 trigger at_once ignore expired 0 ignore update 0 into streamt23 as select ts, a, b from rct1;
sql create stream streams24 trigger at_once ignore expired 0 ignore update 0 into streamt24(ts, a primary key, b) as select ts, a, b from rct1;
sql create stream streams25 trigger at_once ignore expired 0 ignore update 0 into rst6 as select ts, a, b from rct1;
sql_error create stream streams26 trigger at_once ignore expired 0 ignore update 0 into rst7 as select ts, 1,b from rct1;
sql_error create stream streams27 trigger at_once ignore expired 0 ignore update 0 into streamt27(ts, a primary key, b) as select ts, 1,b from rct1;
print ======over
system sh/stop_dnodes.sh