Merge pull request #20259 from taosdata/fix/main_bugfix_wxy
fix: create stream syntax check
This commit is contained in:
commit
b4c7c16aa9
|
@ -41,14 +41,15 @@ extern "C" {
|
|||
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
|
||||
|
||||
#define SHOW_ALIVE_RESULT_COLS 1
|
||||
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
|
||||
|
||||
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0)
|
||||
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1)
|
||||
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2)
|
||||
#define PRIVILEGE_TYPE_SUBSCRIBE PRIVILEGE_TYPE_MASK(3)
|
||||
#define BIT_FLAG_MASK(n) (1 << n)
|
||||
#define BIT_FLAG_SET_MASK(val, mask) ((val) |= (mask))
|
||||
#define BIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
|
||||
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0)
|
||||
#define PRIVILEGE_TYPE_ALL BIT_FLAG_MASK(0)
|
||||
#define PRIVILEGE_TYPE_READ BIT_FLAG_MASK(1)
|
||||
#define PRIVILEGE_TYPE_WRITE BIT_FLAG_MASK(2)
|
||||
#define PRIVILEGE_TYPE_SUBSCRIBE BIT_FLAG_MASK(3)
|
||||
|
||||
typedef struct SDatabaseOptions {
|
||||
ENodeType type;
|
||||
|
@ -393,6 +394,15 @@ typedef struct SKillQueryStmt {
|
|||
char queryId[TSDB_QUERY_ID_LEN];
|
||||
} SKillQueryStmt;
|
||||
|
||||
typedef enum EStreamOptionsSetFlag {
|
||||
SOPT_TRIGGER_TYPE_SET = BIT_FLAG_MASK(0),
|
||||
SOPT_WATERMARK_SET = BIT_FLAG_MASK(1),
|
||||
SOPT_DELETE_MARK_SET = BIT_FLAG_MASK(2),
|
||||
SOPT_FILL_HISTORY_SET = BIT_FLAG_MASK(3),
|
||||
SOPT_IGNORE_EXPIRED_SET = BIT_FLAG_MASK(4),
|
||||
SOPT_IGNORE_UPDATE_SET = BIT_FLAG_MASK(5),
|
||||
} EStreamOptionsSetFlag;
|
||||
|
||||
typedef struct SStreamOptions {
|
||||
ENodeType type;
|
||||
int8_t triggerType;
|
||||
|
@ -402,6 +412,7 @@ typedef struct SStreamOptions {
|
|||
int8_t fillHistory;
|
||||
int8_t ignoreExpired;
|
||||
int8_t ignoreUpdate;
|
||||
int64_t setFlag;
|
||||
} SStreamOptions;
|
||||
|
||||
typedef struct SCreateStreamStmt {
|
||||
|
|
|
@ -215,6 +215,8 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
|
|||
const SToken* pLibPath, SDataType dataType, int32_t bufSize);
|
||||
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
|
||||
SNode* createStreamOptions(SAstCreateContext* pCxt);
|
||||
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
|
||||
SNode* pNode);
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
|
||||
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);
|
||||
|
|
|
@ -562,14 +562,14 @@ tag_def_or_ref_opt(A) ::= tags_def(B).
|
|||
tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; }
|
||||
|
||||
stream_options(A) ::= . { A = createStreamOptions(pCxt); }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; }
|
||||
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; }
|
||||
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; }
|
||||
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; }
|
||||
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; }
|
||||
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
|
||||
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D)); }
|
||||
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
|
||||
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); }
|
||||
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL); }
|
||||
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
|
||||
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_UPDATE_SET, &C, NULL); }
|
||||
|
||||
subtable_opt(A) ::= . { A = NULL; }
|
||||
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }
|
||||
|
|
|
@ -1817,6 +1817,59 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
|
|||
return (SNode*)pOptions;
|
||||
}
|
||||
|
||||
static int8_t getTriggerType(uint32_t tokenType) {
|
||||
switch (tokenType) {
|
||||
case TK_AT_ONCE:
|
||||
return STREAM_TRIGGER_AT_ONCE;
|
||||
case TK_WINDOW_CLOSE:
|
||||
return STREAM_TRIGGER_WINDOW_CLOSE;
|
||||
case TK_MAX_DELAY:
|
||||
return STREAM_TRIGGER_MAX_DELAY;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
return STREAM_TRIGGER_WINDOW_CLOSE;
|
||||
}
|
||||
|
||||
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
|
||||
SNode* pNode) {
|
||||
SStreamOptions* pStreamOptions = (SStreamOptions*)pOptions;
|
||||
if (BIT_FLAG_TEST_MASK(setflag, pStreamOptions->setFlag)) {
|
||||
pCxt->errCode =
|
||||
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "stream options each item is only set once");
|
||||
return pOptions;
|
||||
}
|
||||
|
||||
switch (setflag) {
|
||||
case SOPT_TRIGGER_TYPE_SET:
|
||||
pStreamOptions->triggerType = getTriggerType(pToken->type);
|
||||
if (STREAM_TRIGGER_MAX_DELAY == pStreamOptions->triggerType) {
|
||||
pStreamOptions->pDelay = pNode;
|
||||
}
|
||||
break;
|
||||
case SOPT_WATERMARK_SET:
|
||||
pStreamOptions->pWatermark = pNode;
|
||||
break;
|
||||
case SOPT_DELETE_MARK_SET:
|
||||
pStreamOptions->pDeleteMark = pNode;
|
||||
break;
|
||||
case SOPT_FILL_HISTORY_SET:
|
||||
pStreamOptions->fillHistory = taosStr2Int8(pToken->z, NULL, 10);
|
||||
break;
|
||||
case SOPT_IGNORE_EXPIRED_SET:
|
||||
pStreamOptions->ignoreExpired = taosStr2Int8(pToken->z, NULL, 10);
|
||||
break;
|
||||
case SOPT_IGNORE_UPDATE_SET:
|
||||
pStreamOptions->ignoreUpdate = taosStr2Int8(pToken->z, NULL, 10);
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
BIT_FLAG_SET_MASK(pStreamOptions->setFlag, setflag);
|
||||
|
||||
return pOptions;
|
||||
}
|
||||
|
||||
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
|
||||
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
|
||||
CHECK_PARSER_STATUS(pCxt);
|
||||
|
|
|
@ -6384,15 +6384,15 @@ static int32_t translateDropFunction(STranslateContext* pCxt, SDropFunctionStmt*
|
|||
|
||||
static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
|
||||
SAlterUserReq req = {0};
|
||||
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
|
||||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
|
||||
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
|
||||
if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
|
||||
(BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
|
||||
BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
|
||||
req.alterType = TSDB_ALTER_USER_ADD_ALL_DB;
|
||||
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
|
||||
req.alterType = TSDB_ALTER_USER_ADD_READ_DB;
|
||||
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
|
||||
req.alterType = TSDB_ALTER_USER_ADD_WRITE_DB;
|
||||
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
|
||||
req.alterType = TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC;
|
||||
}
|
||||
strcpy(req.user, pStmt->userName);
|
||||
|
@ -6402,15 +6402,15 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
|
|||
|
||||
static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
|
||||
SAlterUserReq req = {0};
|
||||
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
|
||||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
|
||||
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
|
||||
if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
|
||||
(BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
|
||||
BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
|
||||
req.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
|
||||
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
|
||||
req.alterType = TSDB_ALTER_USER_REMOVE_READ_DB;
|
||||
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
|
||||
req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB;
|
||||
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
|
||||
} else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
|
||||
req.alterType = TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC;
|
||||
}
|
||||
strcpy(req.user, pStmt->userName);
|
||||
|
@ -6482,11 +6482,11 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD
|
|||
if (NULL == pStmt->pCfg) {
|
||||
return TSDB_CODE_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
|
||||
SName name;
|
||||
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
|
||||
tNameGetFullDbName(&name, pStmt->dbFName);
|
||||
|
||||
|
||||
return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg);
|
||||
}
|
||||
|
||||
|
|
|
@ -4608,7 +4608,6 @@ static YYACTIONTYPE yy_reduce(
|
|||
{ yymsp[1].minor.yy42 = createStreamOptions(pCxt); }
|
||||
break;
|
||||
case 279: /* sma_stream_opt ::= sma_stream_opt WATERMARK duration_literal */
|
||||
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */ yytestcase(yyruleno==316);
|
||||
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pWatermark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
|
||||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
|
@ -4617,7 +4616,6 @@ static YYACTIONTYPE yy_reduce(
|
|||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 281: /* sma_stream_opt ::= sma_stream_opt DELETE_MARK duration_literal */
|
||||
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */ yytestcase(yyruleno==319);
|
||||
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pDeleteMark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
|
||||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
|
@ -4677,27 +4675,32 @@ static YYACTIONTYPE yy_reduce(
|
|||
{ pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy103, &yymsp[0].minor.yy225); }
|
||||
break;
|
||||
case 313: /* stream_options ::= stream_options TRIGGER AT_ONCE */
|
||||
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_AT_ONCE; yylhsminor.yy42 = yymsp[-2].minor.yy42; }
|
||||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */
|
||||
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; yylhsminor.yy42 = yymsp[-2].minor.yy42; }
|
||||
case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */ yytestcase(yyruleno==314);
|
||||
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[0].minor.yy0, NULL); }
|
||||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 315: /* stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */
|
||||
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)yymsp[-3].minor.yy42)->pDelay = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
|
||||
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[-1].minor.yy0, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
|
||||
yymsp[-3].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */
|
||||
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
|
||||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 317: /* stream_options ::= stream_options IGNORE EXPIRED NK_INTEGER */
|
||||
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreExpired = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
|
||||
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_EXPIRED_SET, &yymsp[0].minor.yy0, NULL); }
|
||||
yymsp[-3].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 318: /* stream_options ::= stream_options FILL_HISTORY NK_INTEGER */
|
||||
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->fillHistory = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
|
||||
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_FILL_HISTORY_SET, &yymsp[0].minor.yy0, NULL); }
|
||||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */
|
||||
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
|
||||
yymsp[-2].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 320: /* stream_options ::= stream_options IGNORE UPDATE NK_INTEGER */
|
||||
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreUpdate = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; }
|
||||
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_UPDATE_SET, &yymsp[0].minor.yy0, NULL); }
|
||||
yymsp[-3].minor.yy42 = yylhsminor.yy42;
|
||||
break;
|
||||
case 322: /* subtable_opt ::= SUBTABLE NK_LP expression NK_RP */
|
||||
|
|
Loading…
Reference in New Issue