fix: add checks for stream query
This commit is contained in:
parent
536c6bdbc5
commit
f15517b1d7
|
@ -4729,15 +4729,8 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery)) {
|
if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) ||
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) {
|
||||||
}
|
|
||||||
|
|
||||||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery;
|
|
||||||
|
|
||||||
if (QUERY_NODE_REAL_TABLE != nodeType(pSelect->pFromTable) ||
|
|
||||||
TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
|
||||||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) {
|
|
||||||
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -4770,12 +4763,23 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
|
||||||
|
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
|
||||||
|
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) {
|
||||||
|
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
|
||||||
|
}
|
||||||
|
return TSDB_CODE_SUCCESS;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) {
|
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) {
|
||||||
pCxt->createStream = true;
|
pCxt->createStream = true;
|
||||||
int32_t code = addWstartTsToCreateStreamQuery(pStmt);
|
int32_t code = addWstartTsToCreateStreamQuery(pStmt);
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
code = translateQuery(pCxt, pStmt);
|
code = translateQuery(pCxt, pStmt);
|
||||||
}
|
}
|
||||||
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
|
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt);
|
||||||
|
}
|
||||||
if (TSDB_CODE_SUCCESS == code) {
|
if (TSDB_CODE_SUCCESS == code) {
|
||||||
getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB);
|
||||||
code = nodesNodeToString(pStmt, false, &pReq->ast, NULL);
|
code = nodesNodeToString(pStmt, false, &pReq->ast, NULL);
|
||||||
|
|
Loading…
Reference in New Issue