From 37d607c82053affb1b27f0360b3135690b06d812 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Wed, 10 May 2023 18:51:21 +0800 Subject: [PATCH 01/15] enh: remove group_key from subsidiaries if multiple select function presents --- source/libs/executor/src/executil.c | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index c51dc39b5b..bfecb65ef8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1484,16 +1484,18 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_OUT_OF_MEMORY; } + int32_t numOfSelectFunc = 0; for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { p = &pCtx[i]; + numOfSelectFunc++; } } - if (p != NULL) { + if (p != NULL && numOfSelectFunc == 1) { p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; } else { From 185591ed77e7e573415ef98289b138cda3b1bfa2 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Wed, 10 May 2023 19:19:07 +0800 Subject: [PATCH 02/15] [TD-23710]: add test case --- tests/system-test/0-others/user_privilege.py | 48 +++++++++++++++----- 1 file changed, 36 insertions(+), 12 deletions(-) diff --git a/tests/system-test/0-others/user_privilege.py b/tests/system-test/0-others/user_privilege.py index 6d49ebfbfe..d1b93f6942 100644 --- a/tests/system-test/0-others/user_privilege.py +++ b/tests/system-test/0-others/user_privilege.py @@ -29,6 +29,7 @@ class TDTestCase: self.stbname = 'stb' self.binary_length = 20 # the length of binary for column_dict self.nchar_length = 20 # the length of nchar for column_dict + self.dbnames = ['db1', 'db2'] self.column_dict = { 'ts': 'timestamp', 'col1': 'float', @@ -57,21 +58,25 @@ class TDTestCase: def create_user(self): user_name = 'test' tdSql.execute(f'create user {user_name} pass "test"') - tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}') + tdSql.execute(f'grant read on {self.dbnames[0]}.{self.stbname} with t2 = "Beijing" to {user_name}') + tdSql.execute(f'grant write on {self.dbnames[1]}.{self.stbname} with t1 = 2 to {user_name}') def prepare_data(self): - tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) - for i in range(self.tbnum): - tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') - for j in self.values_list: - tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + for db in self.dbnames: + tdSql.execute(f"create database {db}") + tdSql.execute(f"use {db}") + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({j})') - def user_privilege_check(self): + def user_read_privilege_check(self, dbname): testconn = taos.connect(user='test', password='test') expectErrNotOccured = False try: - sql = "select count(*) from db.stb where t2 = 'Beijing'" + sql = f"select count(*) from {dbname}.stb where t2 = 'Beijing'" res = testconn.query(sql) data = res.fetch_all() count = data[0][0] @@ -85,11 +90,30 @@ class TDTestCase: tdLog.exit(f"{sql}, expect result doesn't match") pass + def user_write_privilege_check(self, dbname): + testconn = taos.connect(user='test', password='test') + expectErrNotOccured = False + + try: + sql = f"insert into {dbname}.stb_1 values(now, 1.1, 200, 0.3)" + testconn.execute(sql) + except BaseException: + expectErrNotOccured = True + + if expectErrNotOccured: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") + else: + pass + def user_privilege_error_check(self): testconn = taos.connect(user='test', password='test') expectErrNotOccured = False - sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"] + sql_list = [f"alter talbe {self.dbnames[0]}.stb_1 set t2 = 'Wuhan'", + f"insert into {self.dbnames[0]}.stb_1 values(now, 1.1, 200, 0.3)", + f"drop table {self.dbnames[0]}.stb_1", + f"select count(*) from {self.dbnames[1]}.stb"] for sql in sql_list: try: @@ -104,11 +128,11 @@ class TDTestCase: tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") pass - def run(self): - tdSql.prepare() + def run(self): self.prepare_data() self.create_user() - self.user_privilege_check() + self.user_read_privilege_check(self.dbnames[0]) + self.user_write_privilege_check(self.dbnames[1]) self.user_privilege_error_check() def stop(self): From db318a40c2bd307ec91cd5b102ea8145b851bffa Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Thu, 11 May 2023 00:55:02 +0800 Subject: [PATCH 03/15] fix: func ctgHandleGetTbMetasRsp invalid read SName --- source/libs/catalog/src/ctgAsync.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/catalog/src/ctgAsync.c b/source/libs/catalog/src/ctgAsync.c index 56c79eac1f..9e654e89d9 100644 --- a/source/libs/catalog/src/ctgAsync.c +++ b/source/libs/catalog/src/ctgAsync.c @@ -1437,12 +1437,12 @@ _return: SMetaRes* pRes = taosArrayGet(ctx->pResList, pFetch->resIdx); pRes->code = code; pRes->pRes = NULL; + ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, + tstrerror(code)); if (0 == atomic_sub_fetch_32(&ctx->fetchNum, 1)) { TSWAP(pTask->res, ctx->pResList); taskDone = true; } - ctgTaskError("Get table %d.%s.%s meta failed with error %s", pName->acctId, pName->dbname, pName->tname, - tstrerror(code)); } if (pTask->res && taskDone) { From 9ce1e8ccf437d628860854a1bd0e0182b2fffa75 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 11 May 2023 13:23:40 +0800 Subject: [PATCH 04/15] fix --- source/libs/executor/src/executil.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index bfecb65ef8..0cab9097b6 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1484,18 +1484,25 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_OUT_OF_MEMORY; } - int32_t numOfSelectFunc = 0; + SHashObj *pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { - p = &pCtx[i]; - numOfSelectFunc++; + void* data = taosHashGet(pSelectFuncs, &pName, strlen(pName)); + if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) { + p = NULL; + break; + } else { + taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num)); + p = &pCtx[i]; + } } } + taosHashCleanup(pSelectFuncs); - if (p != NULL && numOfSelectFunc == 1) { + if (p != NULL) { p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; } else { From e58fda2bddb8cd994f2e9d7e19b80d2cacad8f60 Mon Sep 17 00:00:00 2001 From: slzhou Date: Thu, 11 May 2023 16:04:06 +0800 Subject: [PATCH 05/15] fix: bypass projection pruning if union set op and subquery --- source/libs/parser/src/parCalcConst.c | 3 +++ source/libs/planner/src/planner.c | 8 ++++++++ 2 files changed, 11 insertions(+) diff --git a/source/libs/parser/src/parCalcConst.c b/source/libs/parser/src/parCalcConst.c index c25d0e7036..01b62a9051 100644 --- a/source/libs/parser/src/parCalcConst.c +++ b/source/libs/parser/src/parCalcConst.c @@ -388,6 +388,9 @@ static bool isSetUselessCol(SSetOperator* pSetOp, int32_t index, SExprNode* pPro } static int32_t calcConstSetOpProjections(SCalcConstContext* pCxt, SSetOperator* pSetOp, bool subquery) { + if (subquery && pSetOp->opType == SET_OP_TYPE_UNION) { + return TSDB_CODE_SUCCESS; + } int32_t index = 0; SNode* pProj = NULL; WHERE_EACH(pProj, pSetOp->pProjectionList) { diff --git a/source/libs/planner/src/planner.c b/source/libs/planner/src/planner.c index c6a4a97f6e..58b8e53478 100644 --- a/source/libs/planner/src/planner.c +++ b/source/libs/planner/src/planner.c @@ -19,6 +19,14 @@ #include "scalar.h" #include "tglobal.h" +static void debugPrintNode(SNode* pNode) { + char* pStr = NULL; + nodesNodeToString(pNode, false, &pStr, NULL); + printf("%s\n", pStr); + taosMemoryFree(pStr); + return; +} + static void dumpQueryPlan(SQueryPlan* pPlan) { if (!tsQueryPlannerTrace) { return; From 788b521592aa84eccd7992c84d9fa77e87d8189e Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 11 May 2023 17:20:58 +0800 Subject: [PATCH 06/15] Revert "fix" This reverts commit 9ce1e8ccf437d628860854a1bd0e0182b2fffa75. --- source/libs/executor/src/executil.c | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0cab9097b6..bfecb65ef8 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1484,25 +1484,18 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_OUT_OF_MEMORY; } - SHashObj *pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); + int32_t numOfSelectFunc = 0; for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { - void* data = taosHashGet(pSelectFuncs, &pName, strlen(pName)); - if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) { - p = NULL; - break; - } else { - taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num)); - p = &pCtx[i]; - } + p = &pCtx[i]; + numOfSelectFunc++; } } - taosHashCleanup(pSelectFuncs); - if (p != NULL) { + if (p != NULL && numOfSelectFunc == 1) { p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; } else { From 3bf9cb2d78388c6382a91be7b24771d2c1f177d3 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Thu, 11 May 2023 13:23:40 +0800 Subject: [PATCH 07/15] fix --- source/libs/executor/src/executil.c | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index bfecb65ef8..0cab9097b6 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1484,18 +1484,25 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu return TSDB_CODE_OUT_OF_MEMORY; } - int32_t numOfSelectFunc = 0; + SHashObj *pSelectFuncs = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_ENTRY_LOCK); for (int32_t i = 0; i < numOfOutput; ++i) { const char* pName = pCtx[i].pExpr->pExpr->_function.functionName; if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { - p = &pCtx[i]; - numOfSelectFunc++; + void* data = taosHashGet(pSelectFuncs, &pName, strlen(pName)); + if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) { + p = NULL; + break; + } else { + taosHashPut(pSelectFuncs, pName, strlen(pName), &num, sizeof(num)); + p = &pCtx[i]; + } } } + taosHashCleanup(pSelectFuncs); - if (p != NULL && numOfSelectFunc == 1) { + if (p != NULL) { p->subsidiaries.pCtx = pValCtx; p->subsidiaries.num = num; } else { From f4d90fb9bbd236b11f6d9216abf2a22a8ddb65c2 Mon Sep 17 00:00:00 2001 From: kailixu Date: Thu, 11 May 2023 20:59:07 +0800 Subject: [PATCH 08/15] fix: column length check when update/add column --- source/libs/parser/src/parTranslater.c | 13 ++++++++++++- tests/script/tsim/alter/table.sim | 19 ++++++++++++++++++- 2 files changed, 30 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index f4c86d4849..db1e82b910 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5326,7 +5326,8 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable } if (TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES == pStmt->alterType) { - if (calcTypeBytes(pStmt->dataType) > TSDB_MAX_FIELD_LEN) { + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); } @@ -5351,6 +5352,11 @@ static int32_t checkAlterSuperTableBySchema(STranslateContext* pCxt, SAlterTable return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_TOO_MANY_COLUMNS); } + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + } + if (pTableMeta->tableInfo.rowSize + calcTypeBytes(pStmt->dataType) > TSDB_MAX_BYTES_PER_ROW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW); } @@ -8322,6 +8328,11 @@ static int32_t buildUpdateColReq(STranslateContext* pCxt, SAlterTableStmt* pStmt return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_MODIFY_COL); } + if ((TSDB_DATA_TYPE_VARCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_BINARY_LEN) || + (TSDB_DATA_TYPE_NCHAR == pStmt->dataType.type && calcTypeBytes(pStmt->dataType) > TSDB_MAX_NCHAR_LEN)) { + return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN); + } + if (pTableMeta->tableInfo.rowSize + pReq->colModBytes - pSchema->bytes > TSDB_MAX_BYTES_PER_ROW) { return generateSyntaxErrMsg(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_ROW_LENGTH, TSDB_MAX_BYTES_PER_ROW); } diff --git a/tests/script/tsim/alter/table.sim b/tests/script/tsim/alter/table.sim index ded5d6f78a..5f45b446ca 100644 --- a/tests/script/tsim/alter/table.sim +++ b/tests/script/tsim/alter/table.sim @@ -657,17 +657,34 @@ if $data20 != null then return -1 endi -print =============== error +print =============== error for normal table sql create table tb2023(ts timestamp, f int); sql_error alter table tb2023 add column v varchar(16375); sql_error alter table tb2023 add column v varchar(16385); sql_error alter table tb2023 add column v varchar(33100); sql alter table tb2023 add column v varchar(16374); +sql_error alter table tb2023 modify column v varchar(16375); sql desc tb2023 sql alter table tb2023 drop column v sql_error alter table tb2023 add column v nchar(4094); sql alter table tb2023 add column v nchar(4093); +sql_error alter table tb2023 modify column v nchar(4094); sql desc tb2023 + +print =============== error for super table +sql create table stb2023(ts timestamp, f int) tags(t1 int); +sql_error alter table stb2023 add column v varchar(16375); +sql_error alter table stb2023 add column v varchar(16385); +sql_error alter table stb2023 add column v varchar(33100); +sql alter table stb2023 add column v varchar(16374); +sql_error alter table stb2023 modify column v varchar(16375); +sql desc stb2023 +sql alter table stb2023 drop column v +sql_error alter table stb2023 add column v nchar(4094); +sql alter table stb2023 add column v nchar(4093); +sql_error alter table stb2023 modify column v nchar(4094); +sql desc stb2023 + print ======= over sql drop database d1 sql select * from information_schema.ins_databases From 00fb5acad7bea9e90a96155a6f02ab59d64ad38b Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 12 May 2023 07:19:03 +0800 Subject: [PATCH 09/15] enhance: add test case --- tests/script/tsim/query/unionall_as_table.sim | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/tests/script/tsim/query/unionall_as_table.sim b/tests/script/tsim/query/unionall_as_table.sim index dc3d2cbec4..f11906214b 100644 --- a/tests/script/tsim/query/unionall_as_table.sim +++ b/tests/script/tsim/query/unionall_as_table.sim @@ -25,4 +25,14 @@ if $data05 != @0021001@ then return -1 endi +sql create table st (ts timestamp, f int) tags (t int); +sql insert into ct1 using st tags(1) values(now, 1)(now+1s, 2) +sql insert into ct2 using st tags(2) values(now+2s, 3)(now+3s, 4) +sql select count(*) from (select * from ct1 union all select * from ct2) +if $rows != 1 then + return -1 +endi +if $data00 != 4 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT From d8a7e140652068adc479f47eaff223899f9e30a7 Mon Sep 17 00:00:00 2001 From: slzhou Date: Fri, 12 May 2023 07:20:27 +0800 Subject: [PATCH 10/15] enhance: add test case --- tests/script/tsim/query/unionall_as_table.sim | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/tests/script/tsim/query/unionall_as_table.sim b/tests/script/tsim/query/unionall_as_table.sim index f11906214b..4d8f990718 100644 --- a/tests/script/tsim/query/unionall_as_table.sim +++ b/tests/script/tsim/query/unionall_as_table.sim @@ -35,4 +35,11 @@ endi if $data00 != 4 then return -1 endi +sql select count(*) from (select * from ct1 union select * from ct2) +if $rows != 1 then + return -1 +endi +if $data00 != 4 then + return -1 +endi system sh/exec.sh -n dnode1 -s stop -x SIGINT From 2e0d463471590ababa7271fa39017f3be0308df3 Mon Sep 17 00:00:00 2001 From: kailixu Date: Fri, 12 May 2023 09:57:11 +0800 Subject: [PATCH 11/15] fix: update test case alter_column.sim --- tests/script/tsim/parser/alter_column.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/parser/alter_column.sim b/tests/script/tsim/parser/alter_column.sim index c70a604c73..d569e47735 100644 --- a/tests/script/tsim/parser/alter_column.sim +++ b/tests/script/tsim/parser/alter_column.sim @@ -48,7 +48,7 @@ sql_error alter table tb modify column c2 binary(10); sql_error alter table tb modify column c2 binary(9); sql_error alter table tb modify column c2 binary(-9); sql_error alter table tb modify column c2 binary(0); -sql alter table tb modify column c2 binary(17000); +sql_error alter table tb modify column c2 binary(17000); sql_error alter table tb modify column c2 nchar(30); sql_error alter table tb modify column c3 double; sql_error alter table tb modify column c3 nchar(10); From 83e049812c06aa44326e3234b6cca71fc0e275c5 Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Fri, 12 May 2023 10:21:01 +0800 Subject: [PATCH 12/15] fix asan error --- source/libs/executor/src/executil.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/executor/src/executil.c b/source/libs/executor/src/executil.c index 0cab9097b6..c8b16ad83b 100644 --- a/source/libs/executor/src/executil.c +++ b/source/libs/executor/src/executil.c @@ -1490,7 +1490,7 @@ static int32_t setSelectValueColumnInfo(SqlFunctionCtx* pCtx, int32_t numOfOutpu if ((strcmp(pName, "_select_value") == 0) || (strcmp(pName, "_group_key") == 0)) { pValCtx[num++] = &pCtx[i]; } else if (fmIsSelectFunc(pCtx[i].functionId)) { - void* data = taosHashGet(pSelectFuncs, &pName, strlen(pName)); + void* data = taosHashGet(pSelectFuncs, pName, strlen(pName)); if (taosHashGetSize(pSelectFuncs) != 0 && data == NULL) { p = NULL; break; From ba4d5e0b752af5d33d899c7d8f625b2f99008a30 Mon Sep 17 00:00:00 2001 From: Shuduo Sang Date: Fri, 12 May 2023 10:48:55 +0800 Subject: [PATCH 13/15] docs: update readme with tdengin1 wechat (#21268) --- README-CN.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README-CN.md b/README-CN.md index 189b7a059a..12ac7b9ee7 100644 --- a/README-CN.md +++ b/README-CN.md @@ -352,4 +352,4 @@ TDengine 提供了丰富的应用程序开发接口,其中包括 C/C++、Java # 加入技术交流群 -TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine",加小 T 为好友,即可入群。 +TDengine 官方社群「物联网大数据群」对外开放,欢迎您加入讨论。搜索微信号 "tdengine1",加小 T 为好友,即可入群。 From d85f0f56d5cf7616cf571ec48b7fb070193e4624 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 12 May 2023 10:51:13 +0800 Subject: [PATCH 14/15] fix:[TD-24111]avoid exec pHandle task in multi query thread --- source/dnode/vnode/src/inc/tq.h | 1 + source/dnode/vnode/src/tq/tqPush.c | 1 + source/dnode/vnode/src/tq/tqUtil.c | 76 +++++++++++++++--------------- 3 files changed, 41 insertions(+), 37 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index e431ca4a01..b2223f9d64 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -102,6 +102,7 @@ typedef struct { STqExecHandle execHandle; // exec SRpcMsg* msg; int32_t noDataPollCnt; + int8_t sendRsp; } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index 950c5ea96b..85fcb4fd80 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -55,6 +55,7 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); } else { + tqPushDataRsp(pTq, pHandle); void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 94803ef438..128f83bf53 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -162,6 +162,10 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return 0; } +static bool isHandleExecuting(STqHandle* pHandle){ + return 0 == atomic_load_8(&pHandle->sendRsp); +} + static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; @@ -177,6 +181,12 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, return code; } + while(isHandleExecuting(pHandle)){ + tqInfo("sub is executing, pHandle:%p", pHandle); + taosMsleep(5); + } + atomic_store_8(&pHandle->sendRsp, 0); + qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { @@ -193,6 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); + atomic_store_8(&pHandle->sendRsp, 1); return code; } else{ @@ -202,7 +213,6 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&dataRsp, TMQ_MSG_TYPE__POLL_RSP); - // NOTE: this pHandle->consumerId may have been changed already. end: @@ -214,12 +224,14 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } + atomic_store_8(&pHandle->sendRsp, 1); + return code; } static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal *offset) { - int code = 0; + int code = 0; int32_t vgId = TD_VID(pTq->pVnode); SWalCkHead* pCkHead = NULL; SMqMetaRsp metaRsp = {0}; @@ -232,10 +244,16 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, return code; } + while(isHandleExecuting(pHandle)){ + tqInfo("sub is executing, pHandle:%p", pHandle); + taosMsleep(5); + } + atomic_store_8(&pHandle->sendRsp, 0); + if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { - tDeleteSTaosxRsp(&taosxRsp); - return -1; + code = -1; + goto end; } if (metaRsp.metaRspLen > 0) { @@ -243,16 +261,14 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, pRequest->consumerId, pHandle->subKey, vgId, metaRsp.rspOffset.type, metaRsp.rspOffset.uid, metaRsp.rspOffset.ts); taosMemoryFree(metaRsp.metaRsp); - tDeleteSTaosxRsp(&taosxRsp); - return code; + goto end; } tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 ",ts:%" PRId64,pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid,taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - return code; + goto end; }else { *offset = taosxRsp.rspOffset; } @@ -264,9 +280,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { - tDeleteSTaosxRsp(&taosxRsp); terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; + code = -1; + goto end; } walSetReaderCapacity(pHandle->pWalReader, 2048); int totalRows = 0; @@ -281,9 +297,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (tqFetchLog(pTq, pHandle, &fetchVer, &pCkHead, pRequest->reqId) < 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + goto end; } SWalCont* pHead = &pCkHead->head; @@ -295,9 +309,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if(totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer - 1); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + goto end; } tqDebug("fetch meta msg, ver:%" PRId64 ", type:%s", pHead->version, TMSG_INFO(pHead->msgType)); @@ -305,17 +317,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, metaRsp.resMsgType = pHead->msgType; metaRsp.metaRspLen = pHead->bodyLen; metaRsp.metaRsp = pHead->body; - if (tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp) < 0) { - code = -1; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; - } - - code = 0; - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return code; + code = tqSendMetaPollRsp(pTq, pMsg, pRequest, &metaRsp); + goto end; } // process data @@ -325,29 +328,28 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, .ver = pHead->version, }; - if (tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows) < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, - pRequest->subKey); - taosMemoryFreeClear(pCkHead); - tDeleteSTaosxRsp(&taosxRsp); - return -1; + code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows); + if (code < 0) { + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey); + goto end; } if (totalRows >= 4096 || taosxRsp.createTableNum > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pTq, pMsg, pRequest, (SMqDataRsp*)&taosxRsp, TMQ_MSG_TYPE__TAOSX_RSP); - tDeleteSTaosxRsp(&taosxRsp); - taosMemoryFreeClear(pCkHead); - return code; + goto end; } else { fetchVer++; } } } +end: + atomic_store_8(&pHandle->sendRsp, 1); + tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead); - return 0; + return code; } int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg) { From cd105381a9a3f10c0e14af2386ea5c5e259981bf Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 12 May 2023 11:41:59 +0800 Subject: [PATCH 15/15] fix:[TD-24111]avoid exec pHandle task in multi query thread --- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tqUtil.c | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index b2223f9d64..85415fbe34 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -102,7 +102,7 @@ typedef struct { STqExecHandle execHandle; // exec SRpcMsg* msg; int32_t noDataPollCnt; - int8_t sendRsp; + int8_t exec; } STqHandle; typedef struct { diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 128f83bf53..32819a5924 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -163,7 +163,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand } static bool isHandleExecuting(STqHandle* pHandle){ - return 0 == atomic_load_8(&pHandle->sendRsp); + return 1 == atomic_load_8(&pHandle->exec); } static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, @@ -185,7 +185,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tqInfo("sub is executing, pHandle:%p", pHandle); taosMsleep(5); } - atomic_store_8(&pHandle->sendRsp, 0); + atomic_store_8(&pHandle->exec, 1); qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); code = tqScanData(pTq, pHandle, &dataRsp, pOffset); @@ -203,7 +203,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, code = tqRegisterPushHandle(pTq, pHandle, pMsg); taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); return code; } else{ @@ -224,7 +224,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, // taosWUnLockLatch(&pTq->lock); tDeleteSMqDataRsp(&dataRsp); } - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); return code; } @@ -248,7 +248,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqInfo("sub is executing, pHandle:%p", pHandle); taosMsleep(5); } - atomic_store_8(&pHandle->sendRsp, 0); + atomic_store_8(&pHandle->exec, 1); if (offset->type != TMQ_OFFSET__LOG) { if (tqScanTaosx(pTq, pHandle, &taosxRsp, &metaRsp, offset) < 0) { @@ -345,7 +345,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } end: - atomic_store_8(&pHandle->sendRsp, 1); + atomic_store_8(&pHandle->exec, 0); tDeleteSTaosxRsp(&taosxRsp); taosMemoryFreeClear(pCkHead);