Merge branch 'main' into feature/3_liaohj

This commit is contained in:
Haojun Liao 2023-03-05 19:09:37 +08:00
commit caeb976dfe
10 changed files with 114 additions and 42 deletions

View File

@ -76,6 +76,7 @@ extern int64_t tsVndCommitMaxIntervalMs;
// mnode // mnode
extern int64_t tsMndSdbWriteDelta; extern int64_t tsMndSdbWriteDelta;
extern int64_t tsMndLogRetention;
// monitor // monitor
extern bool tsEnableMonitor; extern bool tsEnableMonitor;

View File

@ -41,14 +41,15 @@ extern "C" {
#define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE)
#define SHOW_ALIVE_RESULT_COLS 1 #define SHOW_ALIVE_RESULT_COLS 1
#define PRIVILEGE_TYPE_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0) #define BIT_FLAG_MASK(n) (1 << n)
#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1) #define BIT_FLAG_SET_MASK(val, mask) ((val) |= (mask))
#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2) #define BIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0)
#define PRIVILEGE_TYPE_SUBSCRIBE PRIVILEGE_TYPE_MASK(3)
#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0) #define PRIVILEGE_TYPE_ALL BIT_FLAG_MASK(0)
#define PRIVILEGE_TYPE_READ BIT_FLAG_MASK(1)
#define PRIVILEGE_TYPE_WRITE BIT_FLAG_MASK(2)
#define PRIVILEGE_TYPE_SUBSCRIBE BIT_FLAG_MASK(3)
typedef struct SDatabaseOptions { typedef struct SDatabaseOptions {
ENodeType type; ENodeType type;
@ -393,6 +394,15 @@ typedef struct SKillQueryStmt {
char queryId[TSDB_QUERY_ID_LEN]; char queryId[TSDB_QUERY_ID_LEN];
} SKillQueryStmt; } SKillQueryStmt;
typedef enum EStreamOptionsSetFlag {
SOPT_TRIGGER_TYPE_SET = BIT_FLAG_MASK(0),
SOPT_WATERMARK_SET = BIT_FLAG_MASK(1),
SOPT_DELETE_MARK_SET = BIT_FLAG_MASK(2),
SOPT_FILL_HISTORY_SET = BIT_FLAG_MASK(3),
SOPT_IGNORE_EXPIRED_SET = BIT_FLAG_MASK(4),
SOPT_IGNORE_UPDATE_SET = BIT_FLAG_MASK(5),
} EStreamOptionsSetFlag;
typedef struct SStreamOptions { typedef struct SStreamOptions {
ENodeType type; ENodeType type;
int8_t triggerType; int8_t triggerType;
@ -402,6 +412,7 @@ typedef struct SStreamOptions {
int8_t fillHistory; int8_t fillHistory;
int8_t ignoreExpired; int8_t ignoreExpired;
int8_t ignoreUpdate; int8_t ignoreUpdate;
int64_t setFlag;
} SStreamOptions; } SStreamOptions;
typedef struct SCreateStreamStmt { typedef struct SCreateStreamStmt {

View File

@ -35,7 +35,6 @@ extern "C" {
#define SYNC_MAX_RECV_TIME_RANGE_MS 1200 #define SYNC_MAX_RECV_TIME_RANGE_MS 1200
#define SYNC_DEL_WAL_MS (1000 * 60) #define SYNC_DEL_WAL_MS (1000 * 60)
#define SYNC_ADD_QUORUM_COUNT 3 #define SYNC_ADD_QUORUM_COUNT 3
#define SYNC_MNODE_LOG_RETENTION 10000
#define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1) #define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1)
#define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 #define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10
#define SNAPSHOT_WAIT_MS 1000 * 30 #define SNAPSHOT_WAIT_MS 1000 * 30

View File

@ -66,7 +66,8 @@ int32_t tsHeartbeatTimeout = 20 * 1000;
int64_t tsVndCommitMaxIntervalMs = 600 * 1000; int64_t tsVndCommitMaxIntervalMs = 600 * 1000;
// mnode // mnode
int64_t tsMndSdbWriteDelta = 2000; int64_t tsMndSdbWriteDelta = 200;
int64_t tsMndLogRetention = 2000;
// monitor // monitor
bool tsEnableMonitor = true; bool tsEnableMonitor = true;
@ -461,6 +462,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, 0) != 0) return -1; if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, 0) != 0) return -1; if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, 0) != 0) return -1;
if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1;
@ -808,6 +810,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsVndCommitMaxIntervalMs = cfgGetItem(pCfg, "vndCommitMaxInterval")->i64; tsVndCommitMaxIntervalMs = cfgGetItem(pCfg, "vndCommitMaxInterval")->i64;
tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64; tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64;
tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64;
tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval;
tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs));

View File

@ -215,6 +215,8 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool
const SToken* pLibPath, SDataType dataType, int32_t bufSize); const SToken* pLibPath, SDataType dataType, int32_t bufSize);
SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName); SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName);
SNode* createStreamOptions(SAstCreateContext* pCxt); SNode* createStreamOptions(SAstCreateContext* pCxt);
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode);
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable, SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols); SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols);
SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName);

View File

@ -562,14 +562,14 @@ tag_def_or_ref_opt(A) ::= tags_def(B).
tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; } tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; }
stream_options(A) ::= . { A = createStreamOptions(pCxt); } stream_options(A) ::= . { A = createStreamOptions(pCxt); }
stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; } stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; } stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; } stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D)); }
stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; } stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL); }
stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; } stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, C)); }
stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; } stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_UPDATE_SET, &C, NULL); }
subtable_opt(A) ::= . { A = NULL; } subtable_opt(A) ::= . { A = NULL; }
subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); } subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); }

View File

@ -1817,6 +1817,59 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) {
return (SNode*)pOptions; return (SNode*)pOptions;
} }
static int8_t getTriggerType(uint32_t tokenType) {
switch (tokenType) {
case TK_AT_ONCE:
return STREAM_TRIGGER_AT_ONCE;
case TK_WINDOW_CLOSE:
return STREAM_TRIGGER_WINDOW_CLOSE;
case TK_MAX_DELAY:
return STREAM_TRIGGER_MAX_DELAY;
default:
break;
}
return STREAM_TRIGGER_WINDOW_CLOSE;
}
SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken,
SNode* pNode) {
SStreamOptions* pStreamOptions = (SStreamOptions*)pOptions;
if (BIT_FLAG_TEST_MASK(setflag, pStreamOptions->setFlag)) {
pCxt->errCode =
generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "stream options each item is only set once");
return pOptions;
}
switch (setflag) {
case SOPT_TRIGGER_TYPE_SET:
pStreamOptions->triggerType = getTriggerType(pToken->type);
if (STREAM_TRIGGER_MAX_DELAY == pStreamOptions->triggerType) {
pStreamOptions->pDelay = pNode;
}
break;
case SOPT_WATERMARK_SET:
pStreamOptions->pWatermark = pNode;
break;
case SOPT_DELETE_MARK_SET:
pStreamOptions->pDeleteMark = pNode;
break;
case SOPT_FILL_HISTORY_SET:
pStreamOptions->fillHistory = taosStr2Int8(pToken->z, NULL, 10);
break;
case SOPT_IGNORE_EXPIRED_SET:
pStreamOptions->ignoreExpired = taosStr2Int8(pToken->z, NULL, 10);
break;
case SOPT_IGNORE_UPDATE_SET:
pStreamOptions->ignoreUpdate = taosStr2Int8(pToken->z, NULL, 10);
break;
default:
break;
}
BIT_FLAG_SET_MASK(pStreamOptions->setFlag, setflag);
return pOptions;
}
SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable, SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable,
SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) { SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) {
CHECK_PARSER_STATUS(pCxt); CHECK_PARSER_STATUS(pCxt);

View File

@ -6384,15 +6384,15 @@ static int32_t translateDropFunction(STranslateContext* pCxt, SDropFunctionStmt*
static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) { static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
SAlterUserReq req = {0}; SAlterUserReq req = {0};
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) || if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) && (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) { BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
req.alterType = TSDB_ALTER_USER_ADD_ALL_DB; req.alterType = TSDB_ALTER_USER_ADD_ALL_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) { } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
req.alterType = TSDB_ALTER_USER_ADD_READ_DB; req.alterType = TSDB_ALTER_USER_ADD_READ_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) { } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
req.alterType = TSDB_ALTER_USER_ADD_WRITE_DB; req.alterType = TSDB_ALTER_USER_ADD_WRITE_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) { } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
req.alterType = TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC; req.alterType = TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC;
} }
strcpy(req.user, pStmt->userName); strcpy(req.user, pStmt->userName);
@ -6402,15 +6402,15 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) {
static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) { static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) {
SAlterUserReq req = {0}; SAlterUserReq req = {0};
if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) || if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) ||
(PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) && (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) &&
PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) { BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) {
req.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; req.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) { } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) {
req.alterType = TSDB_ALTER_USER_REMOVE_READ_DB; req.alterType = TSDB_ALTER_USER_REMOVE_READ_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) { } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) {
req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB; req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB;
} else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) { } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) {
req.alterType = TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC; req.alterType = TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC;
} }
strcpy(req.user, pStmt->userName); strcpy(req.user, pStmt->userName);
@ -6482,11 +6482,11 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD
if (NULL == pStmt->pCfg) { if (NULL == pStmt->pCfg) {
return TSDB_CODE_OUT_OF_MEMORY; return TSDB_CODE_OUT_OF_MEMORY;
} }
SName name; SName name;
tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName));
tNameGetFullDbName(&name, pStmt->dbFName); tNameGetFullDbName(&name, pStmt->dbFName);
return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg); return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg);
} }

View File

@ -4608,7 +4608,6 @@ static YYACTIONTYPE yy_reduce(
{ yymsp[1].minor.yy42 = createStreamOptions(pCxt); } { yymsp[1].minor.yy42 = createStreamOptions(pCxt); }
break; break;
case 279: /* sma_stream_opt ::= sma_stream_opt WATERMARK duration_literal */ case 279: /* sma_stream_opt ::= sma_stream_opt WATERMARK duration_literal */
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */ yytestcase(yyruleno==316);
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pWatermark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; } { ((SStreamOptions*)yymsp[-2].minor.yy42)->pWatermark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42; yymsp[-2].minor.yy42 = yylhsminor.yy42;
break; break;
@ -4617,7 +4616,6 @@ static YYACTIONTYPE yy_reduce(
yymsp[-2].minor.yy42 = yylhsminor.yy42; yymsp[-2].minor.yy42 = yylhsminor.yy42;
break; break;
case 281: /* sma_stream_opt ::= sma_stream_opt DELETE_MARK duration_literal */ case 281: /* sma_stream_opt ::= sma_stream_opt DELETE_MARK duration_literal */
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */ yytestcase(yyruleno==319);
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->pDeleteMark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; } { ((SStreamOptions*)yymsp[-2].minor.yy42)->pDeleteMark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42; yymsp[-2].minor.yy42 = yylhsminor.yy42;
break; break;
@ -4677,27 +4675,32 @@ static YYACTIONTYPE yy_reduce(
{ pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy103, &yymsp[0].minor.yy225); } { pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy103, &yymsp[0].minor.yy225); }
break; break;
case 313: /* stream_options ::= stream_options TRIGGER AT_ONCE */ case 313: /* stream_options ::= stream_options TRIGGER AT_ONCE */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_AT_ONCE; yylhsminor.yy42 = yymsp[-2].minor.yy42; } case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */ yytestcase(yyruleno==314);
yymsp[-2].minor.yy42 = yylhsminor.yy42; { yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[0].minor.yy0, NULL); }
break;
case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; yylhsminor.yy42 = yymsp[-2].minor.yy42; }
yymsp[-2].minor.yy42 = yylhsminor.yy42; yymsp[-2].minor.yy42 = yylhsminor.yy42;
break; break;
case 315: /* stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */ case 315: /* stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)yymsp[-3].minor.yy42)->pDelay = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-3].minor.yy42; } { yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[-1].minor.yy0, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-3].minor.yy42 = yylhsminor.yy42; yymsp[-3].minor.yy42 = yylhsminor.yy42;
break; break;
case 316: /* stream_options ::= stream_options WATERMARK duration_literal */
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 317: /* stream_options ::= stream_options IGNORE EXPIRED NK_INTEGER */ case 317: /* stream_options ::= stream_options IGNORE EXPIRED NK_INTEGER */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreExpired = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; } { yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_EXPIRED_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-3].minor.yy42 = yylhsminor.yy42; yymsp[-3].minor.yy42 = yylhsminor.yy42;
break; break;
case 318: /* stream_options ::= stream_options FILL_HISTORY NK_INTEGER */ case 318: /* stream_options ::= stream_options FILL_HISTORY NK_INTEGER */
{ ((SStreamOptions*)yymsp[-2].minor.yy42)->fillHistory = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-2].minor.yy42; } { yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_FILL_HISTORY_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-2].minor.yy42 = yylhsminor.yy42;
break;
case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */
{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); }
yymsp[-2].minor.yy42 = yylhsminor.yy42; yymsp[-2].minor.yy42 = yylhsminor.yy42;
break; break;
case 320: /* stream_options ::= stream_options IGNORE UPDATE NK_INTEGER */ case 320: /* stream_options ::= stream_options IGNORE UPDATE NK_INTEGER */
{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreUpdate = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; } { yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_UPDATE_SET, &yymsp[0].minor.yy0, NULL); }
yymsp[-3].minor.yy42 = yylhsminor.yy42; yymsp[-3].minor.yy42 = yylhsminor.yy42;
break; break;
case 322: /* subtable_opt ::= SUBTABLE NK_LP expression NK_RP */ case 322: /* subtable_opt ::= SUBTABLE NK_LP expression NK_RP */

View File

@ -285,7 +285,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) {
if (syncNodeIsMnode(pSyncNode)) { if (syncNodeIsMnode(pSyncNode)) {
// mnode // mnode
logRetention = SYNC_MNODE_LOG_RETENTION; logRetention = tsMndLogRetention;
} else { } else {
// vnode // vnode
if (pSyncNode->replicaNum > 1) { if (pSyncNode->replicaNum > 1) {