From 9305b0efd0499093c6dfb91ed0623ed2f0d2bda9 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 3 Mar 2023 16:57:13 +0800 Subject: [PATCH 01/10] fix: create stream syntax check --- include/libs/nodes/cmdnodes.h | 23 ++++++++--- source/libs/parser/inc/parAst.h | 2 + source/libs/parser/inc/sql.y | 16 ++++---- source/libs/parser/src/parAstCreater.c | 53 ++++++++++++++++++++++++++ source/libs/parser/src/parTranslater.c | 28 +++++++------- source/libs/parser/src/sql.c | 25 ++++++------ 6 files changed, 108 insertions(+), 39 deletions(-) diff --git a/include/libs/nodes/cmdnodes.h b/include/libs/nodes/cmdnodes.h index a0fc8d1238..736ee7da05 100644 --- a/include/libs/nodes/cmdnodes.h +++ b/include/libs/nodes/cmdnodes.h @@ -41,14 +41,15 @@ extern "C" { #define SHOW_LOCAL_VARIABLES_RESULT_FIELD2_LEN (TSDB_CONFIG_VALUE_LEN + VARSTR_HEADER_SIZE) #define SHOW_ALIVE_RESULT_COLS 1 -#define PRIVILEGE_TYPE_MASK(n) (1 << n) -#define PRIVILEGE_TYPE_ALL PRIVILEGE_TYPE_MASK(0) -#define PRIVILEGE_TYPE_READ PRIVILEGE_TYPE_MASK(1) -#define PRIVILEGE_TYPE_WRITE PRIVILEGE_TYPE_MASK(2) -#define PRIVILEGE_TYPE_SUBSCRIBE PRIVILEGE_TYPE_MASK(3) +#define BIT_FLAG_MASK(n) (1 << n) +#define BIT_FLAG_SET_MASK(val, mask) ((val) |= (mask)) +#define BIT_FLAG_TEST_MASK(val, mask) (((val) & (mask)) != 0) -#define PRIVILEGE_TYPE_TEST_MASK(val, mask) (((val) & (mask)) != 0) +#define PRIVILEGE_TYPE_ALL BIT_FLAG_MASK(0) +#define PRIVILEGE_TYPE_READ BIT_FLAG_MASK(1) +#define PRIVILEGE_TYPE_WRITE BIT_FLAG_MASK(2) +#define PRIVILEGE_TYPE_SUBSCRIBE BIT_FLAG_MASK(3) typedef struct SDatabaseOptions { ENodeType type; @@ -393,6 +394,15 @@ typedef struct SKillQueryStmt { char queryId[TSDB_QUERY_ID_LEN]; } SKillQueryStmt; +typedef enum EStreamOptionsSetFlag { + SOPT_TRIGGER_TYPE_SET = BIT_FLAG_MASK(0), + SOPT_WATERMARK_SET = BIT_FLAG_MASK(1), + SOPT_DELETE_MARK_SET = BIT_FLAG_MASK(2), + SOPT_FILL_HISTORY_SET = BIT_FLAG_MASK(3), + SOPT_IGNORE_EXPIRED_SET = BIT_FLAG_MASK(4), + SOPT_IGNORE_UPDATE_SET = BIT_FLAG_MASK(5), +} EStreamOptionsSetFlag; + typedef struct SStreamOptions { ENodeType type; int8_t triggerType; @@ -402,6 +412,7 @@ typedef struct SStreamOptions { int8_t fillHistory; int8_t ignoreExpired; int8_t ignoreUpdate; + int64_t setFlag; } SStreamOptions; typedef struct SCreateStreamStmt { diff --git a/source/libs/parser/inc/parAst.h b/source/libs/parser/inc/parAst.h index 3b309db1db..4b9fa7aa42 100644 --- a/source/libs/parser/inc/parAst.h +++ b/source/libs/parser/inc/parAst.h @@ -215,6 +215,8 @@ SNode* createCreateFunctionStmt(SAstCreateContext* pCxt, bool ignoreExists, bool const SToken* pLibPath, SDataType dataType, int32_t bufSize); SNode* createDropFunctionStmt(SAstCreateContext* pCxt, bool ignoreNotExists, const SToken* pFuncName); SNode* createStreamOptions(SAstCreateContext* pCxt); +SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken, + SNode* pNode); SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols); SNode* createDropStreamStmt(SAstCreateContext* pCxt, bool ignoreNotExists, SToken* pStreamName); diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index fb6d3bfdd9..b2f85eb677 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -562,14 +562,14 @@ tag_def_or_ref_opt(A) ::= tags_def(B). tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. { A = B; } stream_options(A) ::= . { A = createStreamOptions(pCxt); } -stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_AT_ONCE; A = B; } -stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE. { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; A = B; } -stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY duration_literal(C). { ((SStreamOptions*)B)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)B)->pDelay = releaseRawExprNode(pCxt, C); A = B; } -stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { ((SStreamOptions*)B)->pWatermark = releaseRawExprNode(pCxt, C); A = B; } -stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreExpired = taosStr2Int8(C.z, NULL, 10); A = B; } -stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { ((SStreamOptions*)B)->fillHistory = taosStr2Int8(C.z, NULL, 10); A = B; } -stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { ((SStreamOptions*)B)->pDeleteMark = releaseRawExprNode(pCxt, C); A = B; } -stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { ((SStreamOptions*)B)->ignoreUpdate = taosStr2Int8(C.z, NULL, 10); A = B; } +stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } +stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } +stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, D); } +stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); } +stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); } +stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL); } +stream_options(A) ::= stream_options(B) DELETE_MARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, C)); } +stream_options(A) ::= stream_options(B) IGNORE UPDATE NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_UPDATE_SET, &C, NULL); } subtable_opt(A) ::= . { A = NULL; } subtable_opt(A) ::= SUBTABLE NK_LP expression(B) NK_RP. { A = releaseRawExprNode(pCxt, B); } diff --git a/source/libs/parser/src/parAstCreater.c b/source/libs/parser/src/parAstCreater.c index 634a239399..469132613d 100644 --- a/source/libs/parser/src/parAstCreater.c +++ b/source/libs/parser/src/parAstCreater.c @@ -1817,6 +1817,59 @@ SNode* createStreamOptions(SAstCreateContext* pCxt) { return (SNode*)pOptions; } +static int8_t getTriggerType(uint32_t tokenType) { + switch (tokenType) { + case TK_AT_ONCE: + return STREAM_TRIGGER_AT_ONCE; + case TK_WINDOW_CLOSE: + return STREAM_TRIGGER_WINDOW_CLOSE; + case TK_MAX_DELAY: + return STREAM_TRIGGER_MAX_DELAY; + default: + break; + } + return STREAM_TRIGGER_WINDOW_CLOSE; +} + +SNode* setStreamOptions(SAstCreateContext* pCxt, SNode* pOptions, EStreamOptionsSetFlag setflag, SToken* pToken, + SNode* pNode) { + SStreamOptions* pStreamOptions = (SStreamOptions*)pOptions; + if (BIT_FLAG_TEST_MASK(setflag, pStreamOptions->setFlag)) { + pCxt->errCode = + generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_SYNTAX_ERROR, "stream options each item is only set once"); + return pOptions; + } + + switch (setflag) { + case SOPT_TRIGGER_TYPE_SET: + pStreamOptions->triggerType = getTriggerType(pToken->type); + if (STREAM_TRIGGER_MAX_DELAY == pStreamOptions->triggerType) { + pStreamOptions->pDelay = pNode; + } + break; + case SOPT_WATERMARK_SET: + pStreamOptions->pWatermark = pNode; + break; + case SOPT_DELETE_MARK_SET: + pStreamOptions->pDeleteMark = pNode; + break; + case SOPT_FILL_HISTORY_SET: + pStreamOptions->fillHistory = taosStr2Int8(pToken->z, NULL, 10); + break; + case SOPT_IGNORE_EXPIRED_SET: + pStreamOptions->ignoreExpired = taosStr2Int8(pToken->z, NULL, 10); + break; + case SOPT_IGNORE_UPDATE_SET: + pStreamOptions->ignoreUpdate = taosStr2Int8(pToken->z, NULL, 10); + break; + default: + break; + } + BIT_FLAG_SET_MASK(pStreamOptions->setFlag, setflag); + + return pOptions; +} + SNode* createCreateStreamStmt(SAstCreateContext* pCxt, bool ignoreExists, SToken* pStreamName, SNode* pRealTable, SNode* pOptions, SNodeList* pTags, SNode* pSubtable, SNode* pQuery, SNodeList* pCols) { CHECK_PARSER_STATUS(pCxt); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 69d48a99d7..9f1201a31a 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6380,15 +6380,15 @@ static int32_t translateDropFunction(STranslateContext* pCxt, SDropFunctionStmt* static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) { SAlterUserReq req = {0}; - if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) || - (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) && - PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) { + if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) || + (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) && + BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) { req.alterType = TSDB_ALTER_USER_ADD_ALL_DB; - } else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) { + } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) { req.alterType = TSDB_ALTER_USER_ADD_READ_DB; - } else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) { + } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) { req.alterType = TSDB_ALTER_USER_ADD_WRITE_DB; - } else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) { + } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) { req.alterType = TSDB_ALTER_USER_ADD_SUBSCRIBE_TOPIC; } strcpy(req.user, pStmt->userName); @@ -6398,15 +6398,15 @@ static int32_t translateGrant(STranslateContext* pCxt, SGrantStmt* pStmt) { static int32_t translateRevoke(STranslateContext* pCxt, SRevokeStmt* pStmt) { SAlterUserReq req = {0}; - if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) || - (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) && - PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) { + if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_ALL) || + (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ) && + BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE))) { req.alterType = TSDB_ALTER_USER_REMOVE_ALL_DB; - } else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) { + } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_READ)) { req.alterType = TSDB_ALTER_USER_REMOVE_READ_DB; - } else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) { + } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_WRITE)) { req.alterType = TSDB_ALTER_USER_REMOVE_WRITE_DB; - } else if (PRIVILEGE_TYPE_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) { + } else if (BIT_FLAG_TEST_MASK(pStmt->privileges, PRIVILEGE_TYPE_SUBSCRIBE)) { req.alterType = TSDB_ALTER_USER_REMOVE_SUBSCRIBE_TOPIC; } strcpy(req.user, pStmt->userName); @@ -6478,11 +6478,11 @@ static int32_t translateShowCreateDatabase(STranslateContext* pCxt, SShowCreateD if (NULL == pStmt->pCfg) { return TSDB_CODE_OUT_OF_MEMORY; } - + SName name; tNameSetDbName(&name, pCxt->pParseCxt->acctId, pStmt->dbName, strlen(pStmt->dbName)); tNameGetFullDbName(&name, pStmt->dbFName); - + return getDBCfg(pCxt, pStmt->dbName, (SDbCfgInfo*)pStmt->pCfg); } diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index b6940f8395..92bc419c48 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -4608,7 +4608,6 @@ static YYACTIONTYPE yy_reduce( { yymsp[1].minor.yy42 = createStreamOptions(pCxt); } break; case 279: /* sma_stream_opt ::= sma_stream_opt WATERMARK duration_literal */ - case 316: /* stream_options ::= stream_options WATERMARK duration_literal */ yytestcase(yyruleno==316); { ((SStreamOptions*)yymsp[-2].minor.yy42)->pWatermark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; } yymsp[-2].minor.yy42 = yylhsminor.yy42; break; @@ -4617,7 +4616,6 @@ static YYACTIONTYPE yy_reduce( yymsp[-2].minor.yy42 = yylhsminor.yy42; break; case 281: /* sma_stream_opt ::= sma_stream_opt DELETE_MARK duration_literal */ - case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */ yytestcase(yyruleno==319); { ((SStreamOptions*)yymsp[-2].minor.yy42)->pDeleteMark = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-2].minor.yy42; } yymsp[-2].minor.yy42 = yylhsminor.yy42; break; @@ -4677,27 +4675,32 @@ static YYACTIONTYPE yy_reduce( { pCxt->pRootNode = createDropStreamStmt(pCxt, yymsp[-1].minor.yy103, &yymsp[0].minor.yy225); } break; case 313: /* stream_options ::= stream_options TRIGGER AT_ONCE */ -{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_AT_ONCE; yylhsminor.yy42 = yymsp[-2].minor.yy42; } - yymsp[-2].minor.yy42 = yylhsminor.yy42; - break; - case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */ -{ ((SStreamOptions*)yymsp[-2].minor.yy42)->triggerType = STREAM_TRIGGER_WINDOW_CLOSE; yylhsminor.yy42 = yymsp[-2].minor.yy42; } + case 314: /* stream_options ::= stream_options TRIGGER WINDOW_CLOSE */ yytestcase(yyruleno==314); +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[0].minor.yy0, NULL); } yymsp[-2].minor.yy42 = yylhsminor.yy42; break; case 315: /* stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */ -{ ((SStreamOptions*)yymsp[-3].minor.yy42)->triggerType = STREAM_TRIGGER_MAX_DELAY; ((SStreamOptions*)yymsp[-3].minor.yy42)->pDelay = releaseRawExprNode(pCxt, yymsp[0].minor.yy42); yylhsminor.yy42 = yymsp[-3].minor.yy42; } +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[-1].minor.yy0, yymsp[0].minor.yy42); } yymsp[-3].minor.yy42 = yylhsminor.yy42; break; + case 316: /* stream_options ::= stream_options WATERMARK duration_literal */ +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); } + yymsp[-2].minor.yy42 = yylhsminor.yy42; + break; case 317: /* stream_options ::= stream_options IGNORE EXPIRED NK_INTEGER */ -{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreExpired = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; } +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_EXPIRED_SET, &yymsp[0].minor.yy0, NULL); } yymsp[-3].minor.yy42 = yylhsminor.yy42; break; case 318: /* stream_options ::= stream_options FILL_HISTORY NK_INTEGER */ -{ ((SStreamOptions*)yymsp[-2].minor.yy42)->fillHistory = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-2].minor.yy42; } +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_FILL_HISTORY_SET, &yymsp[0].minor.yy0, NULL); } + yymsp[-2].minor.yy42 = yylhsminor.yy42; + break; + case 319: /* stream_options ::= stream_options DELETE_MARK duration_literal */ +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-2].minor.yy42, SOPT_DELETE_MARK_SET, NULL, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); } yymsp[-2].minor.yy42 = yylhsminor.yy42; break; case 320: /* stream_options ::= stream_options IGNORE UPDATE NK_INTEGER */ -{ ((SStreamOptions*)yymsp[-3].minor.yy42)->ignoreUpdate = taosStr2Int8(yymsp[0].minor.yy0.z, NULL, 10); yylhsminor.yy42 = yymsp[-3].minor.yy42; } +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_IGNORE_UPDATE_SET, &yymsp[0].minor.yy0, NULL); } yymsp[-3].minor.yy42 = yylhsminor.yy42; break; case 322: /* subtable_opt ::= SUBTABLE NK_LP expression NK_RP */ From e937c7727579c84b47168a9e39080fb9f437fb91 Mon Sep 17 00:00:00 2001 From: Xiaoyu Wang Date: Fri, 3 Mar 2023 18:35:51 +0800 Subject: [PATCH 02/10] fix: create stream syntax check --- source/libs/parser/inc/sql.y | 2 +- source/libs/parser/src/sql.c | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index b2f85eb677..317bfccb78 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -564,7 +564,7 @@ tag_def_or_ref_opt(A) ::= TAGS NK_LP col_name_list(B) NK_RP. stream_options(A) ::= . { A = createStreamOptions(pCxt); } stream_options(A) ::= stream_options(B) TRIGGER AT_ONCE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } stream_options(A) ::= stream_options(B) TRIGGER WINDOW_CLOSE(C). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, NULL); } -stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, D); } +stream_options(A) ::= stream_options(B) TRIGGER MAX_DELAY(C) duration_literal(D). { A = setStreamOptions(pCxt, B, SOPT_TRIGGER_TYPE_SET, &C, releaseRawExprNode(pCxt, D)); } stream_options(A) ::= stream_options(B) WATERMARK duration_literal(C). { A = setStreamOptions(pCxt, B, SOPT_WATERMARK_SET, NULL, releaseRawExprNode(pCxt, C)); } stream_options(A) ::= stream_options(B) IGNORE EXPIRED NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_IGNORE_EXPIRED_SET, &C, NULL); } stream_options(A) ::= stream_options(B) FILL_HISTORY NK_INTEGER(C). { A = setStreamOptions(pCxt, B, SOPT_FILL_HISTORY_SET, &C, NULL); } diff --git a/source/libs/parser/src/sql.c b/source/libs/parser/src/sql.c index 92bc419c48..04f838b92d 100644 --- a/source/libs/parser/src/sql.c +++ b/source/libs/parser/src/sql.c @@ -4680,7 +4680,7 @@ static YYACTIONTYPE yy_reduce( yymsp[-2].minor.yy42 = yylhsminor.yy42; break; case 315: /* stream_options ::= stream_options TRIGGER MAX_DELAY duration_literal */ -{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[-1].minor.yy0, yymsp[0].minor.yy42); } +{ yylhsminor.yy42 = setStreamOptions(pCxt, yymsp[-3].minor.yy42, SOPT_TRIGGER_TYPE_SET, &yymsp[-1].minor.yy0, releaseRawExprNode(pCxt, yymsp[0].minor.yy42)); } yymsp[-3].minor.yy42 = yylhsminor.yy42; break; case 316: /* stream_options ::= stream_options WATERMARK duration_literal */ From b91734cf91664487302d21d55125db986d04422c Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Sat, 4 Mar 2023 09:51:34 +0800 Subject: [PATCH 03/10] enh: sync log retention of mnode configurable with mndLogRetention --- include/common/tglobal.h | 1 + include/libs/sync/sync.h | 1 - source/common/src/tglobal.c | 5 ++++- source/libs/sync/src/syncMain.c | 2 +- 4 files changed, 6 insertions(+), 3 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 86395c44da..ac75b84762 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -76,6 +76,7 @@ extern int64_t tsVndCommitMaxIntervalMs; // mnode extern int64_t tsMndSdbWriteDelta; +extern int64_t tsMndLogRetention; // monitor extern bool tsEnableMonitor; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5e37da4f3f..189484d1a6 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -35,7 +35,6 @@ extern "C" { #define SYNC_MAX_RECV_TIME_RANGE_MS 1200 #define SYNC_DEL_WAL_MS (1000 * 60) #define SYNC_ADD_QUORUM_COUNT 3 -#define SYNC_MNODE_LOG_RETENTION 10000 #define SYNC_VNODE_LOG_RETENTION (TSDB_SYNC_LOG_BUFFER_RETENTION + 1) #define SNAPSHOT_MAX_CLOCK_SKEW_MS 1000 * 10 #define SNAPSHOT_WAIT_MS 1000 * 30 diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 023852c9fc..cd5486ea25 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -66,7 +66,8 @@ int32_t tsHeartbeatTimeout = 20 * 1000; int64_t tsVndCommitMaxIntervalMs = 600 * 1000; // mnode -int64_t tsMndSdbWriteDelta = 2000; +int64_t tsMndSdbWriteDelta = 200; +int64_t tsMndLogRetention = 2000; // monitor bool tsEnableMonitor = true; @@ -461,6 +462,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { if (cfgAddInt64(pCfg, "vndCommitMaxInterval", tsVndCommitMaxIntervalMs, 1000, 1000 * 60 * 60, 0) != 0) return -1; if (cfgAddInt64(pCfg, "mndSdbWriteDelta", tsMndSdbWriteDelta, 20, 10000, 0) != 0) return -1; + if (cfgAddInt64(pCfg, "mndLogRetention", tsMndLogRetention, 500, 10000, 0) != 0) return -1; if (cfgAddBool(pCfg, "monitor", tsEnableMonitor, 0) != 0) return -1; if (cfgAddInt32(pCfg, "monitorInterval", tsMonitorInterval, 1, 200000, 0) != 0) return -1; @@ -810,6 +812,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { tsVndCommitMaxIntervalMs = cfgGetItem(pCfg, "vndCommitMaxInterval")->i64; tsMndSdbWriteDelta = cfgGetItem(pCfg, "mndSdbWriteDelta")->i64; + tsMndLogRetention = cfgGetItem(pCfg, "mndLogRetention")->i64; tsStartUdfd = cfgGetItem(pCfg, "udf")->bval; tstrncpy(tsUdfdResFuncs, cfgGetItem(pCfg, "udfdResFuncs")->str, sizeof(tsUdfdResFuncs)); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index b07c05dcfe..9601cd6ab0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -285,7 +285,7 @@ int32_t syncBeginSnapshot(int64_t rid, int64_t lastApplyIndex) { if (syncNodeIsMnode(pSyncNode)) { // mnode - logRetention = SYNC_MNODE_LOG_RETENTION; + logRetention = tsMndLogRetention; } else { // vnode if (pSyncNode->replicaNum > 1) { From e54e12eff0c8d5da57919d2c292a71335fad512a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Mar 2023 11:57:27 +0800 Subject: [PATCH 04/10] fix:give error if col is same in schemless & fix json parse error in TD-22903 --- source/client/src/clientSml.c | 2 + source/client/src/clientSmlJson.c | 2 +- source/common/src/tglobal.c | 1 + source/dnode/vnode/src/vnd/vnodeSync.c | 2 +- utils/test/c/sml_test.c | 65 ++++++++++++++++++++++++-- 5 files changed, 66 insertions(+), 6 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index b83e7ee976..3314932272 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1141,6 +1141,7 @@ static int32_t smlPushCols(SArray *colsArray, SArray *cols) { for (size_t i = 0; i < taosArrayGetSize(cols); i++) { SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); + if(terrno == TSDB_CODE_DUP_KEY){return terrno;} } taosArrayPush(colsArray, &kvHash); @@ -1204,6 +1205,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) { SSmlSTableMeta *meta = smlBuildSTableMeta(info->dataFormat); smlInsertMeta(meta->tagHash, meta->tags, tinfo->tags); + if(terrno == TSDB_CODE_DUP_KEY){return terrno;} smlInsertMeta(meta->colHash, meta->cols, elements->colArray); taosHashPut(info->superTables, elements->measure, elements->measureLen, &meta, POINTER_BYTES); } diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index e89227d412..b9a1cd00a1 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -1214,7 +1214,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo * return TSDB_CODE_INVALID_TIMESTAMP; } else if (elements->timestamp[0] == '{') { char tmp = elements->timestamp[elements->timestampLen]; - elements->cols[elements->timestampLen] = '\0'; + elements->timestamp[elements->timestampLen] = '\0'; cJSON *tsJson = cJSON_Parse(elements->timestamp); ts = smlParseTSFromJSON(info, tsJson); if (unlikely(ts < 0)) { diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 4bb64e5fd6..1535c80feb 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -1246,6 +1246,7 @@ int32_t taosCreateLog(const char *logname, int32_t logFileNum, const char *cfgDi taosSetAllDebugFlag(cfgGetItem(pCfg, "debugFlag")->i32, false); if (taosMulModeMkDir(tsLogDir, 0777) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); uError("failed to create dir:%s since %s", tsLogDir, terrstr()); cfgCleanup(pCfg); return -1; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 1e96a76170..e71b03d2af 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -292,7 +292,7 @@ void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) code = vnodePreProcessWriteMsg(pVnode, pMsg); if (code != 0) { - vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, terrstr()); + vGError("vgId:%d, msg:%p failed to pre-process since %s", vgId, pMsg, tstrerror(code)); if (terrno != 0) code = terrno; vnodeHandleProposeError(pVnode, pMsg, code); rpcFreeCont(pMsg->pCont); diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index c36ab38877..b8d018fc82 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -137,6 +137,8 @@ int smlProcess_json1_Test() { for (int i = 0; i < 1; i++) { taosMemoryFree(sql1[i]); } + ASSERT(code == 0); + const char *sql2[] = { "[{\"metric\":\"sys.cpu.nice\",\"timestamp\":1662344041,\"value\":13,\"tags\":{\"host\":\"web01\",\"dc\":\"lga\"}" @@ -164,6 +166,34 @@ int smlProcess_json1_Test() { taosMemoryFree(sql3[i]); } + ASSERT(code == 0); + + + // TD-22903 + const char *sql4[] = { + "[{\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833639, \"type\": \"ms\"}, \"value\": true, \"tags\": {\"t0\": true}}, {\"metric\": \"test_us\", \"timestamp\": {\"value\": 1626006833638, \"type\": \"ms\"}, \"value\": false, \"tags\": {\"t0\": true}}]" + }; + char *sql5[1] = {0}; + for (int i = 0; i < 1; i++) { + sql5[i] = taosMemoryCalloc(1, 1024); + strncpy(sql5[i], sql4[i], 1023); + } + + pRes = taos_schemaless_insert(taos, (char **)sql5, sizeof(sql5) / sizeof(sql5[0]), TSDB_SML_JSON_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS); + code = taos_errno(pRes); + if (code != 0) { + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + } else { + printf("%s result:success\n", __FUNCTION__); + } + taos_free_result(pRes); + + for (int i = 0; i < 1; i++) { + taosMemoryFree(sql5[i]); + } + ASSERT(code == 0); + taos_close(taos); return code; @@ -927,6 +957,31 @@ int sml_ts2164_Test() { return code; } +int sml_td22900_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = + taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'"); + taos_free_result(pRes); + + const char *sql[] = { + "qddkgilwfu,id=qddkgilwfu_42383_49198,t0=t,t1=127i8 c4=9223372036854775807i64,c6=11.12345f32,c6=22.123456789f64 1626006833639" + }; + + pRes = taos_query(taos, "use line_test"); + taos_free_result(pRes); + + pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + int code = taos_errno(pRes); + taos_free_result(pRes); + taos_close(taos); + + return code; +} + int sml_ttl_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -1096,16 +1151,18 @@ int main(int argc, char *argv[]) { } int ret = 0; - ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file - ASSERT(!ret); +// ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file +// ASSERT(!ret); // for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){ // printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i]))); // } // int ret = 0; - ret = sml_ttl_Test(); - ASSERT(!ret); +// ret = sml_ttl_Test(); +// ASSERT(!ret); ret = sml_ts2164_Test(); ASSERT(!ret); + ret = sml_td22900_Test(); + ASSERT(ret); ret = smlProcess_influx_Test(); ASSERT(!ret); ret = smlProcess_telnet_Test(); From 3bfd156616fdbf8ae237bee916c5d90d89f07b9f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Mar 2023 17:37:38 +0800 Subject: [PATCH 05/10] fix:[TD-22898]:modify schema if schema change --- source/client/src/clientSml.c | 3 +- source/client/src/clientSmlJson.c | 82 +++++++------------------- source/client/src/clientSmlLine.c | 89 ++++++++--------------------- source/client/src/clientSmlTelnet.c | 85 +++++++-------------------- utils/test/c/sml_test.c | 46 ++++++++++++++- 5 files changed, 111 insertions(+), 194 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 3314932272..ca76579b9a 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -817,6 +817,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { if (i < pTableMeta->tableInfo.numOfColumns) { taosArrayPush(pColumns, &field); } else { + uError("SML:0x%" PRIx64 "field name:%s, bytes:%d", info->id, field.name, field.bytes); taosArrayPush(pTags, &field); } } @@ -1073,7 +1074,6 @@ void smlDestroyInfo(SSmlHandle *info) { taosArrayDestroy(info->valueJsonArray); taosArrayDestroy(info->preLineTagKV); - taosArrayDestroy(info->maxTagKVs); taosArrayDestroy(info->preLineColKV); if (!info->dataFormat) { @@ -1117,7 +1117,6 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { info->tagJsonArray = taosArrayInit(8, POINTER_BYTES); info->valueJsonArray = taosArrayInit(8, POINTER_BYTES); info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv)); - info->maxTagKVs = taosArrayInit(8, sizeof(SSmlKv)); info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv)); if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) { diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index b9a1cd00a1..22fd762960 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -683,9 +683,6 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo int cnt = 0; SArray *preLineKV = info->preLineTagKV; - SArray *maxKVs = info->maxTagKVs; - bool isSuperKVInit = true; - SArray *superKV = NULL; if (info->dataFormat) { if (unlikely(!isSameMeasure)) { SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); @@ -701,17 +698,15 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo sMeta->tableMeta = pTableMeta; taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES); tmp = &sMeta; + for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){ + SSchema *tag = pTableMeta->schema + i; + SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE }; + taosArrayPush(sMeta->tags, &kv); + } } info->currSTableMeta = (*tmp)->tableMeta; - superKV = (*tmp)->tags; - - if (unlikely(taosArrayGetSize(superKV) == 0)) { - isSuperKVInit = false; - } - taosArrayClear(maxKVs); + info->maxTagKVs = (*tmp)->tags; } - } else { - taosArrayClear(maxKVs); } taosArrayClear(preLineKV); @@ -747,58 +742,21 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo return TSDB_CODE_SUCCESS; } - if (isSameMeasure) { - if (unlikely(cnt >= taosArrayGetSize(maxKVs))) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); - if (unlikely(kv.length > maxKV->length)) { - maxKV->length = kv.length; - SSmlSTableMeta **tableMeta = - (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); - if (unlikely(NULL == tableMeta)) { - uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); - return TSDB_CODE_SML_INTERNAL_ERROR; - } - - SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt); - oldKV->length = kv.length; - info->needModifySchema = true; - } - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - if (isSuperKVInit) { - if (unlikely(cnt >= taosArrayGetSize(superKV))) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt); - if (unlikely(kv.length > maxKV->length)) { - maxKV->length = kv.length; - } else { - kv.length = maxKV->length; - } - info->needModifySchema = true; - - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - taosArrayPush(superKV, &kv); - } - taosArrayPush(maxKVs, &kv); + if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt); + if (unlikely(!IS_SAME_KEY)) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + if (unlikely(kv.length > maxKV->length)) { + maxKV->length = kv.length; + info->needModifySchema = true; } - } else { - taosArrayPush(maxKVs, &kv); } taosArrayPush(preLineKV, &kv); cnt++; diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index a2f752edb9..c2fe69c875 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -144,14 +144,11 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin int cnt = 0; SArray *preLineKV = info->preLineTagKV; - SArray *maxKVs = info->maxTagKVs; - bool isSuperKVInit = true; - SArray *superKV = NULL; if (info->dataFormat) { if (unlikely(!isSameMeasure)) { SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); - SSmlSTableMeta *sMeta = NULL; + if (unlikely(tmp == NULL)) { STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); if (pTableMeta == NULL) { @@ -159,21 +156,20 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin info->reRun = true; return TSDB_CODE_SUCCESS; } - sMeta = smlBuildSTableMeta(info->dataFormat); + SSmlSTableMeta *sMeta = smlBuildSTableMeta(info->dataFormat); sMeta->tableMeta = pTableMeta; taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES); tmp = &sMeta; + + for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){ + SSchema *tag = pTableMeta->schema + i; + SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE }; + taosArrayPush(sMeta->tags, &kv); + } } info->currSTableMeta = (*tmp)->tableMeta; - superKV = (*tmp)->tags; - - if (unlikely(taosArrayGetSize(superKV) == 0)) { - isSuperKVInit = false; - } - taosArrayClear(maxKVs); + info->maxTagKVs = (*tmp)->tags; } - } else { - taosArrayClear(maxKVs); } taosArrayClear(preLineKV); @@ -252,58 +248,23 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin return TSDB_CODE_SUCCESS; } - if (isSameMeasure) { - if (unlikely(cnt >= taosArrayGetSize(maxKVs))) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); - if (unlikely(kv.length > maxKV->length)) { - maxKV->length = kv.length; - SSmlSTableMeta **tableMeta = - (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); - if (unlikely(NULL == tableMeta)) { - uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); - return TSDB_CODE_SML_INTERNAL_ERROR; - } - - SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt); - oldKV->length = kv.length; - info->needModifySchema = true; - } - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - if (isSuperKVInit) { - if (unlikely(cnt >= taosArrayGetSize(superKV))) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt); - if (unlikely(kv.length > maxKV->length)) { - maxKV->length = kv.length; - } else { - kv.length = maxKV->length; - } - info->needModifySchema = true; - - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - taosArrayPush(superKV, &kv); - } - taosArrayPush(maxKVs, &kv); + if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt); + + if (unlikely(!IS_SAME_KEY)) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + + if (unlikely(kv.length > maxKV->length)) { + maxKV->length = kv.length; + info->needModifySchema = true; } - } else { - taosArrayPush(maxKVs, &kv); } taosArrayPush(preLineKV, &kv); diff --git a/source/client/src/clientSmlTelnet.c b/source/client/src/clientSmlTelnet.c index ab071305fa..9201412cbd 100644 --- a/source/client/src/clientSmlTelnet.c +++ b/source/client/src/clientSmlTelnet.c @@ -79,13 +79,10 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS int cnt = 0; SArray *preLineKV = info->preLineTagKV; - SArray *maxKVs = info->maxTagKVs; - bool isSuperKVInit = true; - SArray *superKV = NULL; if (info->dataFormat) { if (!isSameMeasure) { SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); - SSmlSTableMeta *sMeta = NULL; + SSmlSTableMeta * sMeta = NULL; if (unlikely(tmp == NULL)) { STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); if (pTableMeta == NULL) { @@ -97,17 +94,15 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS sMeta->tableMeta = pTableMeta; taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES); tmp = &sMeta; + for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){ + SSchema *tag = pTableMeta->schema + i; + SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE }; + taosArrayPush(sMeta->tags, &kv); + } } info->currSTableMeta = (*tmp)->tableMeta; - superKV = (*tmp)->tags; - - if (unlikely(taosArrayGetSize(superKV) == 0)) { - isSuperKVInit = false; - } - taosArrayClear(maxKVs); + info->maxTagKVs = (*tmp)->tags; } - } else { - taosArrayClear(maxKVs); } taosArrayClear(preLineKV); @@ -175,59 +170,21 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS info->reRun = true; return TSDB_CODE_SUCCESS; } - - if (isSameMeasure) { - if (unlikely(cnt >= taosArrayGetSize(maxKVs))) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(maxKVs, cnt); - if (unlikely(kv.length > maxKV->length)) { - maxKV->length = kv.length; - SSmlSTableMeta **tableMeta = - (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); - if (unlikely(NULL == tableMeta)) { - uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); - return TSDB_CODE_SML_INTERNAL_ERROR; - } - - SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->tags, cnt); - oldKV->length = kv.length; - info->needModifySchema = true; - } - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - if (isSuperKVInit) { - if (unlikely(cnt >= taosArrayGetSize(superKV))) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt); - if (unlikely(kv.length > maxKV->length)) { - maxKV->length = kv.length; - } else { - kv.length = maxKV->length; - } - info->needModifySchema = true; - - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - taosArrayPush(superKV, &kv); - } - taosArrayPush(maxKVs, &kv); + if (unlikely(cnt >= taosArrayGetSize(info->maxTagKVs))) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->maxTagKVs, cnt); + if (unlikely(!IS_SAME_KEY)) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + if (unlikely(kv.length > maxKV->length)) { + maxKV->length = kv.length; + info->needModifySchema = true; } - } else { - taosArrayPush(maxKVs, &kv); } taosArrayPush(preLineKV, &kv); cnt++; diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index b8d018fc82..7441184271 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -957,6 +957,46 @@ int sml_ts2164_Test() { return code; } +int sml_td22898_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = + taos_query(taos, "CREATE DATABASE IF NOT EXISTS line_test BUFFER 384 MINROWS 1000 PAGES 256 PRECISION 'ns'"); + taos_free_result(pRes); + + const char *sql[] = { + "svlzxdfutx,id=nyavpjyfas,t0=t,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=t,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64 1626006833639" + }; + + pRes = taos_query(taos, "use line_test"); + taos_free_result(pRes); + + pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + int code = taos_errno(pRes); + taos_free_result(pRes); + + const char *sql1[] = { + "svlzxdfutx,id=nyavpjyfas,t0=True,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"tgqkvsws\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"htvnnldm\",c8=L\"ncharColValue\",c9=7u64 1626006833639" + }; + + pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + + printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes)); + code = taos_errno(pRes); + taos_free_result(pRes); + + pRes = taos_query(taos, "select * from svlzxdfutx"); + taos_free_result(pRes); + + taos_close(taos); + + return code; +} + int sml_td22900_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -1157,10 +1197,12 @@ int main(int argc, char *argv[]) { // printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i]))); // } // int ret = 0; -// ret = sml_ttl_Test(); -// ASSERT(!ret); + ret = sml_ttl_Test(); + ASSERT(!ret); ret = sml_ts2164_Test(); ASSERT(!ret); + ret = sml_td22898_Test(); + ASSERT(!ret); ret = sml_td22900_Test(); ASSERT(ret); ret = smlProcess_influx_Test(); From 4827c25a6169fff230ebe2a29fbc30ad0987162f Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Mar 2023 18:16:41 +0800 Subject: [PATCH 06/10] fix:give error if col is same in schemless & fix json parse error in TD-22903 --- source/client/inc/clientSml.h | 2 +- source/client/src/clientSml.c | 3 - source/client/src/clientSmlJson.c | 4 +- source/client/src/clientSmlLine.c | 115 +++++++++------------------- source/client/src/clientSmlTelnet.c | 4 +- 5 files changed, 42 insertions(+), 86 deletions(-) diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index 3dcba673bf..d8d41d8be6 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -190,7 +190,7 @@ typedef struct { // SArray *preLineTagKV; SArray *maxTagKVs; - SArray *preLineColKV; + SArray *masColKVs; SSmlLineInfo preLine; STableMeta *currSTableMeta; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ca76579b9a..c438196e71 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -817,7 +817,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { if (i < pTableMeta->tableInfo.numOfColumns) { taosArrayPush(pColumns, &field); } else { - uError("SML:0x%" PRIx64 "field name:%s, bytes:%d", info->id, field.name, field.bytes); taosArrayPush(pTags, &field); } } @@ -1074,7 +1073,6 @@ void smlDestroyInfo(SSmlHandle *info) { taosArrayDestroy(info->valueJsonArray); taosArrayDestroy(info->preLineTagKV); - taosArrayDestroy(info->preLineColKV); if (!info->dataFormat) { for (int i = 0; i < info->lineNum; i++) { @@ -1117,7 +1115,6 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { info->tagJsonArray = taosArrayInit(8, POINTER_BYTES); info->valueJsonArray = taosArrayInit(8, POINTER_BYTES); info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv)); - info->preLineColKV = taosArrayInit(8, sizeof(SSmlKv)); if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) { uError("create SSmlHandle failed"); diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index 22fd762960..da82d43950 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -686,7 +686,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo if (info->dataFormat) { if (unlikely(!isSameMeasure)) { SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); - SSmlSTableMeta *sMeta = NULL; + SSmlSTableMeta *sMeta = NULL; if (unlikely(tmp == NULL)) { STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); if (pTableMeta == NULL) { @@ -697,12 +697,12 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo sMeta = smlBuildSTableMeta(info->dataFormat); sMeta->tableMeta = pTableMeta; taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES); - tmp = &sMeta; for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){ SSchema *tag = pTableMeta->schema + i; SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE }; taosArrayPush(sMeta->tags, &kv); } + tmp = &sMeta; } info->currSTableMeta = (*tmp)->tableMeta; info->maxTagKVs = (*tmp)->tags; diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index c2fe69c875..66f1316cd5 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -149,6 +149,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); + SSmlSTableMeta *sMeta = NULL; if (unlikely(tmp == NULL)) { STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); if (pTableMeta == NULL) { @@ -156,16 +157,15 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin info->reRun = true; return TSDB_CODE_SUCCESS; } - SSmlSTableMeta *sMeta = smlBuildSTableMeta(info->dataFormat); + sMeta = smlBuildSTableMeta(info->dataFormat); sMeta->tableMeta = pTableMeta; taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES); - tmp = &sMeta; - for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){ SSchema *tag = pTableMeta->schema + i; SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE }; taosArrayPush(sMeta->tags, &kv); } + tmp = &sMeta; } info->currSTableMeta = (*tmp)->tableMeta; info->maxTagKVs = (*tmp)->tags; @@ -305,9 +305,6 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLineInfo *currElement, bool isSameMeasure, bool isSameCTable) { int cnt = 0; - SArray *preLineKV = info->preLineColKV; - bool isSuperKVInit = true; - SArray *superKV = NULL; if (info->dataFormat) { if (unlikely(!isSameCTable)) { SSmlTableInfo **oneTable = @@ -322,7 +319,6 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin if (unlikely(!isSameMeasure)) { SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); - SSmlSTableMeta *sMeta = NULL; if (unlikely(tmp == NULL)) { STableMeta *pTableMeta = smlGetMeta(info, currElement->measure, currElement->measureLen); if (pTableMeta == NULL) { @@ -330,17 +326,23 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin info->reRun = true; return TSDB_CODE_SUCCESS; } - sMeta = smlBuildSTableMeta(info->dataFormat); - sMeta->tableMeta = pTableMeta; - taosHashPut(info->superTables, currElement->measure, currElement->measureLen, &sMeta, POINTER_BYTES); - tmp = &sMeta; + *tmp = smlBuildSTableMeta(info->dataFormat); + (*tmp)->tableMeta = pTableMeta; + taosHashPut(info->superTables, currElement->measure, currElement->measureLen, tmp, POINTER_BYTES); + + for(int i = 0; i < pTableMeta->tableInfo.numOfColumns; i++){ + SSchema *tag = pTableMeta->schema + i; + SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type }; + if(tag->type == TSDB_DATA_TYPE_NCHAR){ + kv.length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; + }else if(tag->type == TSDB_DATA_TYPE_BINARY){ + kv.length = tag->bytes - VARSTR_HEADER_SIZE; + } + taosArrayPush((*tmp)->cols, &kv); + } } info->currSTableMeta = (*tmp)->tableMeta; - superKV = (*tmp)->cols; - if (unlikely(taosArrayGetSize(superKV) == 0)) { - isSuperKVInit = false; - } - taosArrayClear(preLineKV); + info->masColKVs = (*tmp)->cols; } } @@ -439,69 +441,26 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin info->reRun = true; return TSDB_CODE_SUCCESS; } + if (cnt >= taosArrayGetSize(info->masColKVs)) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + SSmlKv *maxKV = (SSmlKv *)taosArrayGet(info->masColKVs, cnt); + if (kv.type != maxKV->type) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } + if (unlikely(!IS_SAME_KEY)) { + info->dataFormat = false; + info->reRun = true; + return TSDB_CODE_SUCCESS; + } - if (isSameMeasure) { - if (cnt >= taosArrayGetSize(preLineKV)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(preLineKV, cnt); - if (kv.type != maxKV->type) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - - if (unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > maxKV->length)) { - maxKV->length = kv.length; - SSmlSTableMeta **tableMeta = - (SSmlSTableMeta **)taosHashGet(info->superTables, currElement->measure, currElement->measureLen); - if (unlikely(NULL == tableMeta)) { - uError("SML:0x%" PRIx64 " NULL == tableMeta", info->id); - return TSDB_CODE_SML_INTERNAL_ERROR; - } - - SSmlKv *oldKV = (SSmlKv *)taosArrayGet((*tableMeta)->cols, cnt); - oldKV->length = kv.length; - info->needModifySchema = true; - } - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - if (isSuperKVInit) { - if (unlikely(cnt >= taosArrayGetSize(superKV))) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - SSmlKv *maxKV = (SSmlKv *)taosArrayGet(superKV, cnt); - if (unlikely(kv.type != maxKV->type)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - - if (IS_VAR_DATA_TYPE(kv.type)) { - if (kv.length > maxKV->length) { - maxKV->length = kv.length; - } else { - kv.length = maxKV->length; - } - info->needModifySchema = true; - } - if (unlikely(!IS_SAME_KEY)) { - info->dataFormat = false; - info->reRun = true; - return TSDB_CODE_SUCCESS; - } - } else { - taosArrayPush(superKV, &kv); - } - taosArrayPush(preLineKV, &kv); + if (unlikely(IS_VAR_DATA_TYPE(kv.type) && kv.length > maxKV->length)) { + maxKV->length = kv.length; + info->needModifySchema = true; } } else { if (currElement->colArray == NULL) { diff --git a/source/client/src/clientSmlTelnet.c b/source/client/src/clientSmlTelnet.c index 9201412cbd..ccf79cfc64 100644 --- a/source/client/src/clientSmlTelnet.c +++ b/source/client/src/clientSmlTelnet.c @@ -82,7 +82,7 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS if (info->dataFormat) { if (!isSameMeasure) { SSmlSTableMeta **tmp = (SSmlSTableMeta **)taosHashGet(info->superTables, elements->measure, elements->measureLen); - SSmlSTableMeta * sMeta = NULL; + SSmlSTableMeta *sMeta = NULL; if (unlikely(tmp == NULL)) { STableMeta *pTableMeta = smlGetMeta(info, elements->measure, elements->measureLen); if (pTableMeta == NULL) { @@ -93,12 +93,12 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS sMeta = smlBuildSTableMeta(info->dataFormat); sMeta->tableMeta = pTableMeta; taosHashPut(info->superTables, elements->measure, elements->measureLen, &sMeta, POINTER_BYTES); - tmp = &sMeta; for(int i = pTableMeta->tableInfo.numOfColumns; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++){ SSchema *tag = pTableMeta->schema + i; SSmlKv kv = {.key = tag->name, .keyLen = strlen(tag->name), .type = tag->type, .length = (tag->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE }; taosArrayPush(sMeta->tags, &kv); } + tmp = &sMeta; } info->currSTableMeta = (*tmp)->tableMeta; info->maxTagKVs = (*tmp)->tags; From cea32f2e3ce4e8f941564e2b97a678a10dd49386 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 4 Mar 2023 18:18:20 +0800 Subject: [PATCH 07/10] fix:give error if col is same in schemless & fix json parse error in TD-22903 --- utils/test/c/sml_test.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index 7441184271..3d2c08149b 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1191,8 +1191,8 @@ int main(int argc, char *argv[]) { } int ret = 0; -// ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file -// ASSERT(!ret); + ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file + ASSERT(!ret); // for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){ // printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i]))); // } From bfc189c9dcac96ea2784a2e1969a71f38b6c308f Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Sat, 4 Mar 2023 19:12:36 +0800 Subject: [PATCH 08/10] fix: update docs for WAL_RETENTION_PERIOD, etc. --- docs/en/12-taos-sql/02-database.md | 8 ++++---- docs/zh/12-taos-sql/02-database.md | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/docs/en/12-taos-sql/02-database.md b/docs/en/12-taos-sql/02-database.md index 5e8c9aaa92..a1c46875aa 100644 --- a/docs/en/12-taos-sql/02-database.md +++ b/docs/en/12-taos-sql/02-database.md @@ -75,10 +75,10 @@ database_option: { - TABLE_PREFIX:The prefix length in the table name that is ignored when distributing table to vnode based on table name. - TABLE_SUFFIX:The suffix length in the table name that is ignored when distributing table to vnode based on table name. - TSDB_PAGESIZE: The page size of the data storage engine in a vnode. The unit is KB. The default is 4 KB. The range is 1 to 16384, that is, 1 KB to 16 MB. -- WAL_RETENTION_PERIOD: specifies the time after which WAL files are deleted. This parameter is used for data subscription. Enter a time in seconds. The default value of single copy is 0. A value of 0 indicates that each WAL file is deleted immediately after its contents are written to disk. -1: WAL files are never deleted. The default value of multiple copy is 4 days. -- WAL_RETENTION_SIZE: specifies the size at which WAL files are deleted. This parameter is used for data subscription. Enter a size in KB. The default value of single copy is 0. A value of 0 indicates that each WAL file is deleted immediately after its contents are written to disk. -1: WAL files are never deleted. The default value of multiple copy is -1. -- WAL_ROLL_PERIOD: specifies the time after which WAL files are rotated. After this period elapses, a new WAL file is created. The default value of single copy is 0. A value of 0 indicates that a new WAL file is created only after the previous WAL file was written to disk. The default values of multiple copy is 1 day. -- WAL_SEGMENT_SIZE: specifies the maximum size of a WAL file. After the current WAL file reaches this size, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after the previous WAL file was written to disk. +- WAL_RETENTION_PERIOD: specifies the maximum time of which WAL files are to be kept after consumption. This parameter is used for data subscription. Enter a time in seconds. The default value 0. A value of 0 indicates that WAL files are not required to keep after consumption. -1: the time of WAL files to keep has no upper limit. +- WAL_RETENTION_SIZE: specifies the maximum total size of which WAL files are to be kept after consumption. This parameter is used for data subscription. Enter a size in KB. The default value is 0. A value of 0 indicates that WAL files are not required to keep after consumption. -1: the total size of WAL files to keep has no upper limit. +- WAL_ROLL_PERIOD: specifies the time after which WAL files are rotated. After this period elapses, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after TSDB data in memory are flushed to disk. +- WAL_SEGMENT_SIZE: specifies the maximum size of a WAL file. After the current WAL file reaches this size, a new WAL file is created. The default value is 0. A value of 0 indicates that a new WAL file is created only after TSDB data in memory are flushed to disk. ### Example Statement diff --git a/docs/zh/12-taos-sql/02-database.md b/docs/zh/12-taos-sql/02-database.md index d9ef3b51aa..9e346fdca8 100644 --- a/docs/zh/12-taos-sql/02-database.md +++ b/docs/zh/12-taos-sql/02-database.md @@ -75,10 +75,10 @@ database_option: { - TABLE_PREFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的前缀的长度。 - TABLE_SUFFIX:内部存储引擎根据表名分配存储该表数据的 VNODE 时要忽略的后缀的长度。 - TSDB_PAGESIZE:一个 VNODE 中时序数据存储引擎的页大小,单位为 KB,默认为 4 KB。范围为 1 到 16384,即 1 KB到 16 MB。 -- WAL_RETENTION_PERIOD:wal 文件的额外保留策略,用于数据订阅。wal 的保存时长,单位为 s。单副本默认为 0,即落盘后立即删除。-1 表示不删除。多副本默认为 4 天。 -- WAL_RETENTION_SIZE:wal 文件的额外保留策略,用于数据订阅。wal 的保存的最大上限,单位为 KB。单副本默认为 0,即落盘后立即删除。多副本默认为-1,表示不删除。 -- WAL_ROLL_PERIOD:wal 文件切换时长,单位为 s。当 wal 文件创建并写入后,经过该时间,会自动创建一个新的 wal 文件。单副本默认为 0,即仅在落盘时创建新文件。多副本默认为 1 天。 -- WAL_SEGMENT_SIZE:wal 单个文件大小,单位为 KB。当前写入文件大小超过上限后会自动创建一个新的 wal 文件。默认为 0,即仅在落盘时创建新文件。 +- WAL_RETENTION_PERIOD:数据订阅已消费WAL日志,WAL文件的最大额外保留的时长策略。单位为 s。默认为 0,表示无需额外保留。-1, 表示额外保留,时间无上限。 +- WAL_RETENTION_SIZE:数据订阅已消费WAL日志,WAL文件的最大额外保留的累计大小策略。单位为 KB。默认为 0,表示无需额外保留。-1, 表示额外保留,累计大小无上限。 +- WAL_ROLL_PERIOD:wal 文件切换时长,单位为 s。当WAL文件创建并写入后,经过该时间,会自动创建一个新的WAL文件。默认为 0,即仅在TSDB落盘时创建新文件。 +- WAL_SEGMENT_SIZE:wal 单个文件大小,单位为 KB。当前写入文件大小超过上限后会自动创建一个新的WAL文件。默认为 0,即仅在TSDB落盘时创建新文件。 ### 创建数据库示例 From ee65a497816bf47585b5e5710d0f63d07f1287a6 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Sat, 4 Mar 2023 23:46:09 +0800 Subject: [PATCH 09/10] fix crash --- source/libs/transport/src/transCli.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index 274d153adc..e554580fc9 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -864,6 +864,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { QUEUE_REMOVE(&conn->q); QUEUE_INIT(&conn->q); + conn->broken = true; + if (conn->list != NULL) { SConnList* connList = conn->list; connList->list->numOfConn--; @@ -1230,7 +1232,7 @@ static void cliSendBatchCb(uv_write_t* req, int status) { } else { tDebug("%s conn %p succ to send batch msg, batch size:%d, msgLen:%d", CONN_GET_INST_LABEL(conn), conn, p->wLen, p->batchSize); - if (!uv_is_closing((uv_handle_t*)&conn->stream)) { + if (!uv_is_closing((uv_handle_t*)&conn->stream) && conn->broken == false) { if (nxtBatch != NULL) { conn->pBatch = nxtBatch; cliSendBatch(conn); From 54184cda3c322e514086ad57b64519befed26a07 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 6 Mar 2023 09:42:08 +0800 Subject: [PATCH 10/10] fix: should not set startFunc/stopFunc to 0. when the trans is not finished on taosd stop, it will cause the startFunc/stopFunc setting error --- source/dnode/mnode/impl/src/mndTrans.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index eccabf2d5c..1ae61213ec 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -512,7 +512,7 @@ static int32_t mndTransActionInsert(SSdb *pSdb, STrans *pTrans) { if (fp) { (*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen); } - pTrans->startFunc = 0; + // pTrans->startFunc = 0; } return 0; @@ -555,7 +555,7 @@ static int32_t mndTransActionDelete(SSdb *pSdb, STrans *pTrans, bool callFunc) { if (fp) { (*fp)(pSdb->pMnode, pTrans->param, pTrans->paramLen); } - pTrans->stopFunc = 0; + // pTrans->stopFunc = 0; } mndTransDropData(pTrans);