From 3678b2373ae7cf7aa05ad0c4e769c5c607be00d3 Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Sun, 23 Apr 2023 20:45:55 +0800 Subject: [PATCH 01/33] [TS-3140]: add test case for user privilege --- tests/parallel_test/cases.task | 1 + tests/system-test/0-others/user_privilege.py | 100 +++++++++++++++++++ 2 files changed, 101 insertions(+) create mode 100644 tests/system-test/0-others/user_privilege.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 6e662a9a15..1403d18414 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -126,6 +126,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/sysinfo.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_control.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_manage.py +,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/user_privilege.py ,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/fsync.py ,,n,system-test,python3 ./test.py -f 0-others/compatibility.py ,,n,system-test,python3 ./test.py -f 0-others/tag_index_basic.py diff --git a/tests/system-test/0-others/user_privilege.py b/tests/system-test/0-others/user_privilege.py new file mode 100644 index 0000000000..b470026be1 --- /dev/null +++ b/tests/system-test/0-others/user_privilege.py @@ -0,0 +1,100 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- + +import taos +from taos.tmq import * +from util.cases import * +from util.common import * +from util.log import * +from util.sql import * +from util.sqlset import * + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug("start to execute %s" % __file__) + tdSql.init(conn.cursor()) + self.setsql = TDSetSql() + self.stbname = 'stb' + self.binary_length = 20 # the length of binary for column_dict + self.nchar_length = 20 # the length of nchar for column_dict + self.column_dict = { + 'ts': 'timestamp', + 'col1': 'float', + 'col2': 'int', + 'col3': 'float', + } + + self.tag_dict = { + 't1': 'int', + 't2': f'binary({self.binary_length})' + } + + self.tag_list = [ + f'1, "Beijing"', + f'2, "Shanghai"', + f'3, "Guangzhou"', + f'4, "Shenzhen"' + ] + + self.values_list = [ + f'now, 9.1, 200, 0.3' + ] + + self.tbnum = 4 + + def create_user(self): + user_name = 'test' + tdSql.execute(f'create user {user_name} pass "test"') + tdSql.execute(f'grant read on db.stb with t2 = "Beijing" to {user_name}') + + def prepare_data(self): + tdSql.execute(self.setsql.set_create_stable_sql(self.stbname, self.column_dict, self.tag_dict)) + for i in range(self.tbnum): + tdSql.execute(f'create table {self.stbname}_{i} using {self.stbname} tags({self.tag_list[i]})') + for j in self.values_list: + tdSql.execute(f'insert into {self.stbname}_{i} values({j})') + + def user_privilege_check(self): + testconn = taos.connect(user='test', password='test') + expectErrNotOccured = False + + try: + sql = "select count(*) from db.stb where t2 = 'Beijing'" + res = testconn.query(sql) + data = res.fetch_all() + count = data[0][0] + except BaseException: + expectErrNotOccured = True + + if expectErrNotOccured: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") + elif count != 1: + tdLog.exit(f"{sql}, expect result doesn't match") + pass + + def run(self): + tdSql.prepare() + self.prepare_data() + self.create_user() + self.user_privilege_check() + + def stop(self): + tdSql.close() + tdLog.success("%s successfully executed" % __file__) + + +tdCases.addWindows(__file__, TDTestCase()) +tdCases.addLinux(__file__, TDTestCase()) \ No newline at end of file From 7b73130bd4e756fb4c9867b59ba00a4045852f9f Mon Sep 17 00:00:00 2001 From: Ping Xiao Date: Mon, 24 Apr 2023 19:53:50 +0800 Subject: [PATCH 02/33] udpate test case --- tests/system-test/0-others/user_privilege.py | 22 +++++++++++++++++++- 1 file changed, 21 insertions(+), 1 deletion(-) diff --git a/tests/system-test/0-others/user_privilege.py b/tests/system-test/0-others/user_privilege.py index b470026be1..6d49ebfbfe 100644 --- a/tests/system-test/0-others/user_privilege.py +++ b/tests/system-test/0-others/user_privilege.py @@ -84,12 +84,32 @@ class TDTestCase: elif count != 1: tdLog.exit(f"{sql}, expect result doesn't match") pass + + def user_privilege_error_check(self): + testconn = taos.connect(user='test', password='test') + expectErrNotOccured = False + + sql_list = ["alter talbe db.stb_1 set t2 = 'Wuhan'", "drop table db.stb_1"] + + for sql in sql_list: + try: + res = testconn.execute(sql) + except BaseException: + expectErrNotOccured = True + + if expectErrNotOccured: + pass + else: + caller = inspect.getframeinfo(inspect.stack()[1][0]) + tdLog.exit(f"{caller.filename}({caller.lineno}) failed: sql:{sql}, expect error not occured") + pass def run(self): tdSql.prepare() self.prepare_data() self.create_user() - self.user_privilege_check() + self.user_privilege_check() + self.user_privilege_error_check() def stop(self): tdSql.close() From bb33f054b4c1a034118b8cfc3b474b243757e22e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 4 May 2023 17:21:18 +0800 Subject: [PATCH 03/33] fix:change field bytes if length is bigger than 1024 --- source/client/src/clientSml.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index d6642dfe8d..01a7a2eac2 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1562,7 +1562,8 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL do { code = smlModifyDBSchemas(info); if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS - || code == TSDB_CODE_PAR_INVALID_TAGS_NUM) break; + || code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH + || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break; taosMsleep(100); uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); From 1d8c517844c5b3c2d1386b5412b05d41215c24d4 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 May 2023 10:46:54 +0800 Subject: [PATCH 04/33] fix:[TS-3303]use stable name + child table name as key to save uid to avoid multi items for one table in submit block --- source/client/inc/clientSml.h | 2 ++ source/client/src/clientSml.c | 19 ++++++++++++- source/client/src/clientSmlJson.c | 2 +- source/client/src/clientSmlLine.c | 2 +- source/client/src/clientSmlTelnet.c | 2 +- tests/system-test/2-query/sml.py | 1 + utils/test/c/sml_test.c | 41 +++++++++++++++++++++++++++++ 7 files changed, 65 insertions(+), 4 deletions(-) diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index 3982c0d9aa..b20fc6f57a 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -169,6 +169,7 @@ typedef struct { int32_t uid; // used for automatic create child table SHashObj *childTables; + SHashObj *tableUids; SHashObj *superTables; SHashObj *pVgHash; @@ -242,6 +243,7 @@ int8_t smlGetTsTypeByLen(int32_t len); SSmlTableInfo* smlBuildTableInfo(int numRows, const char* measure, int32_t measureLen); SSmlSTableMeta* smlBuildSTableMeta(bool isDataFormat); int32_t smlSetCTableName(SSmlTableInfo *oneTable); +void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo); STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen); int32_t is_same_child_table_telnet(const void *a, const void *b); int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index dd3f50f440..c5f7d9a1e7 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -195,6 +195,20 @@ int32_t smlSetCTableName(SSmlTableInfo *oneTable) { return TSDB_CODE_SUCCESS; } +void getTableUid(SSmlHandle *info, SSmlLineInfo *currElement, SSmlTableInfo *tinfo){ + char key[TSDB_TABLE_NAME_LEN * 2 + 1] = {0}; + size_t nLen = strlen(tinfo->childTableName); + memcpy(key, currElement->measure, currElement->measureLen); + memcpy(key + currElement->measureLen + 1, tinfo->childTableName, nLen); + void *uid = taosHashGet(info->tableUids, key, currElement->measureLen + 1 + nLen); // use \0 as separator for stable name and child table name + if (uid == NULL) { + tinfo->uid = info->uid++; + taosHashPut(info->tableUids, key, currElement->measureLen + 1 + nLen, &tinfo->uid, sizeof(uint64_t)); + }else{ + tinfo->uid = *(uint64_t*)uid; + } +} + SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) { SSmlSTableMeta *meta = (SSmlSTableMeta *)taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); if (!meta) { @@ -1142,6 +1156,7 @@ void smlDestroyInfo(SSmlHandle *info) { taosHashCleanup(info->pVgHash); taosHashCleanup(info->childTables); taosHashCleanup(info->superTables); + taosHashCleanup(info->tableUids); for (int i = 0; i < taosArrayGetSize(info->tagJsonArray); i++) { cJSON *tags = (cJSON *)taosArrayGetP(info->tagJsonArray, i); @@ -1192,6 +1207,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->childTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); + info->tableUids = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); info->superTables = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); info->id = smlGenId(); @@ -1202,7 +1218,7 @@ SSmlHandle *smlBuildSmlInfo(TAOS *taos) { info->valueJsonArray = taosArrayInit(8, POINTER_BYTES); info->preLineTagKV = taosArrayInit(8, sizeof(SSmlKv)); - if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables) { + if (NULL == info->pVgHash || NULL == info->childTables || NULL == info->superTables || NULL == info->tableUids) { uError("create SSmlHandle failed"); goto cleanup; } @@ -1428,6 +1444,7 @@ int32_t smlClearForRerun(SSmlHandle *info) { taosHashClear(info->childTables); taosHashClear(info->superTables); + taosHashClear(info->tableUids); if (!info->dataFormat) { if (unlikely(info->lines != NULL)) { diff --git a/source/client/src/clientSmlJson.c b/source/client/src/clientSmlJson.c index b0ae316031..7ccf930964 100644 --- a/source/client/src/clientSmlJson.c +++ b/source/client/src/clientSmlJson.c @@ -778,7 +778,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *tags, SSmlLineInfo tinfo->tags = taosArrayDup(preLineKV, NULL); smlSetCTableName(tinfo); - tinfo->uid = info->uid++; + getTableUid(info, elements, tinfo); if (info->dataFormat) { info->currSTableMeta->uid = tinfo->uid; tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); diff --git a/source/client/src/clientSmlLine.c b/source/client/src/clientSmlLine.c index 1732473c11..2f7e8a0f97 100644 --- a/source/client/src/clientSmlLine.c +++ b/source/client/src/clientSmlLine.c @@ -312,7 +312,7 @@ static int32_t smlParseTagKv(SSmlHandle *info, char **sql, char *sqlEnd, SSmlLin } smlSetCTableName(tinfo); - tinfo->uid = info->uid++; + getTableUid(info, currElement, tinfo); if (info->dataFormat) { info->currSTableMeta->uid = tinfo->uid; tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); diff --git a/source/client/src/clientSmlTelnet.c b/source/client/src/clientSmlTelnet.c index 036442573d..c5dd20ba7b 100644 --- a/source/client/src/clientSmlTelnet.c +++ b/source/client/src/clientSmlTelnet.c @@ -206,7 +206,7 @@ static int32_t smlParseTelnetTags(SSmlHandle *info, char *data, char *sqlEnd, SS tinfo->tags = taosArrayDup(preLineKV, NULL); smlSetCTableName(tinfo); - tinfo->uid = info->uid++; + getTableUid(info, elements, tinfo); if (info->dataFormat) { info->currSTableMeta->uid = tinfo->uid; tinfo->tableDataCtx = smlInitTableDataCtx(info->pQuery, info->currSTableMeta); diff --git a/tests/system-test/2-query/sml.py b/tests/system-test/2-query/sml.py index f96ed8a3ff..519957f6f9 100644 --- a/tests/system-test/2-query/sml.py +++ b/tests/system-test/2-query/sml.py @@ -34,6 +34,7 @@ class TDTestCase: if ret != 0: tdLog.info("sml_test ret != 0") + tdSql.query(f"select * from ts3303.stb2") # tdSql.execute('use sml_db') tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211") tdSql.checkRows(1) diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index f1f4bbc1fd..ffcd5d1b2e 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1159,6 +1159,44 @@ int sml_td23881_Test() { return code; } +int sml_ts3303_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "drop database if exists ts3303"); + taos_free_result(pRes); + + pRes = taos_query(taos, "create database if not exists ts3303"); + taos_free_result(pRes); + + const char *sql[] = { + "stb2,t1=1,dataModelName=t0 f1=283i32 1632299372000", + "stb2,t1=1,dataModelName=t0 f1=106i32 1632299378000", + "stb2,t1=4,dataModelName=t0 f1=144i32 1629716944000", + "stb2,t1=4,dataModelName=t0 f1=125i32 1629717012000", + "stb2,t1=4,dataModelName=t0 f1=144i32 1629717012000", + "stb2,t1=4,dataModelName=t0 f1=107i32 1629717013000", + "stb2,t1=6,dataModelName=t0 f1=154i32 1629717140000", + "stb2,t1=6,dataModelName=t0 f1=93i32 1629717140000", + "stb2,t1=6,dataModelName=t0 f1=134i32 1629717140000", + "stb2,t1=4,dataModelName=t0 f1=73i32 1629717140000", + "stb2,t1=4,dataModelName=t0 f1=83i32 1629717140000", + "stb2,t1=4,dataModelName=t0 f1=72i32 1629717140000", + }; + + pRes = taos_query(taos, "use ts3303"); + taos_free_result(pRes); + + pRes = taos_schemaless_insert_ttl(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS, 20); + + int code = taos_errno(pRes); + printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes)); + taos_free_result(pRes); + taos_close(taos); + + return code; +} + int sml_ttl_Test() { TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -1336,6 +1374,9 @@ int main(int argc, char *argv[]) { ASSERT(!ret); ret = sml_ts2385_Test(); // this test case need config sml table name using ./sml_test config_file ASSERT(!ret); + ret = sml_ts3303_Test(); // this test case need config sml table name using ./sml_test config_file + ASSERT(!ret); + // for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){ // printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i]))); // } From 3fb2d7656d2d39941aaa25b29759a7dd9bf81f0e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 May 2023 13:39:37 +0800 Subject: [PATCH 05/33] fix:[TS-3303]use stable name + child table name as key to save uid to avoid multi items for one table in submit block --- tests/system-test/2-query/sml.py | 2 ++ utils/test/c/sml_test.c | 13 +++++++++++++ 2 files changed, 15 insertions(+) diff --git a/tests/system-test/2-query/sml.py b/tests/system-test/2-query/sml.py index 519957f6f9..2f97118fbf 100644 --- a/tests/system-test/2-query/sml.py +++ b/tests/system-test/2-query/sml.py @@ -35,6 +35,8 @@ class TDTestCase: tdLog.info("sml_test ret != 0") tdSql.query(f"select * from ts3303.stb2") + tdSql.query(f"select * from ts3303.meters") + # tdSql.execute('use sml_db') tdSql.query(f"select * from {dbname}.t_b7d815c9222ca64cdf2614c61de8f211") tdSql.checkRows(1) diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index ffcd5d1b2e..f1dc8ebe79 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -1183,6 +1183,11 @@ int sml_ts3303_Test() { "stb2,t1=4,dataModelName=t0 f1=72i32 1629717140000", }; + const char *sql1[] = { + "meters,location=California.LosAngeles,groupid=2 current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000", + "meters,groupid=2,location=California.LosAngeles current=11.8,voltage=221,phase=\"2022-02-0210:22:22\" 1626006833339000000", + }; + pRes = taos_query(taos, "use ts3303"); taos_free_result(pRes); @@ -1190,8 +1195,16 @@ int sml_ts3303_Test() { TSDB_SML_TIMESTAMP_MILLI_SECONDS, 20); int code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + taos_free_result(pRes); + ASSERT(code == 0); + + pRes = taos_schemaless_insert_ttl(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS, 20); + printf("%s result1:%s\n", __FUNCTION__, taos_errstr(pRes)); taos_free_result(pRes); + taos_close(taos); return code; From 0a469484fa7159e70f754b197429554fe5006344 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 May 2023 15:01:27 +0800 Subject: [PATCH 06/33] fix:[TS-3277]table not exist if drop stable in another client when insert data using schemaless interface --- include/util/taoserror.h | 2 +- source/client/src/clientSml.c | 15 +++++++-------- source/util/src/terror.c | 2 +- 3 files changed, 9 insertions(+), 10 deletions(-) diff --git a/include/util/taoserror.h b/include/util/taoserror.h index ab89466a19..082d0dde47 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -146,7 +146,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_TSC_CONN_KILLED TAOS_DEF_ERROR_CODE(0, 0x0215) #define TSDB_CODE_TSC_SQL_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x0216) #define TSDB_CODE_TSC_DB_NOT_SELECTED TAOS_DEF_ERROR_CODE(0, 0x0217) -#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) +//#define TSDB_CODE_TSC_INVALID_TABLE_NAME TAOS_DEF_ERROR_CODE(0, 0x0218) #define TSDB_CODE_TSC_EXCEED_SQL_LIMIT TAOS_DEF_ERROR_CODE(0, 0x0219) #define TSDB_CODE_TSC_FILE_EMPTY TAOS_DEF_ERROR_CODE(0, 0x021A) #define TSDB_CODE_TSC_LINE_SYNTAX_ERROR TAOS_DEF_ERROR_CODE(0, 0x021B) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index dd3f50f440..2d86f1989d 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1320,17 +1320,15 @@ static int32_t smlInsertData(SSmlHandle *info) { if (info->pRequest->dbList == NULL) { info->pRequest->dbList = taosArrayInit(1, TSDB_DB_FNAME_LEN); } - void *data = taosArrayReserve(info->pRequest->dbList, 1); - memcpy(data, info->pRequest->pDb, - TSDB_DB_FNAME_LEN > strlen(info->pRequest->pDb) ? strlen(info->pRequest->pDb) : TSDB_DB_FNAME_LEN); + char *data = (char*)taosArrayReserve(info->pRequest->dbList, 1); + SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; + tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); + tNameGetFullDbName(&pName, data); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashIterate(info->childTables, NULL); while (oneTable) { SSmlTableInfo *tableData = *oneTable; - - SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; - tstrncpy(pName.dbname, info->pRequest->pDb, sizeof(pName.dbname)); - memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName)); + tstrncpy(pName.tname, tableData->sTableName, tableData->sTableNameLen + 1); if (info->pRequest->tableList == NULL) { info->pRequest->tableList = taosArrayInit(1, sizeof(SName)); @@ -1647,7 +1645,8 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine, info->cost.endTime = taosGetTimestampUs(); info->cost.code = code; if (code == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || code == TSDB_CODE_SDB_OBJ_CREATING || - code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT) { + code == TSDB_CODE_PAR_VALUE_TOO_LONG || code == TSDB_CODE_MND_TRANS_CONFLICT || + code == TSDB_CODE_PAR_TABLE_NOT_EXIST) { if (cnt++ >= 10) { uInfo("SML:%" PRIx64 " retry:%d/10 end code:%d, msg:%s", info->id, cnt, code, tstrerror(code)); break; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 002d605793..09cb1a20cb 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -122,7 +122,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TSC_NO_WRITE_AUTH, "No write permission") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_CONN_KILLED, "Connection killed") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_SQL_SYNTAX_ERROR, "Syntax error in SQL") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_DB_NOT_SELECTED, "Database not specified or available") -TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") +//TAOS_DEFINE_ERROR(TSDB_CODE_TSC_INVALID_TABLE_NAME, "Table does not exist") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_EXCEED_SQL_LIMIT, "SQL statement too long") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_FILE_EMPTY, "File is empty") TAOS_DEFINE_ERROR(TSDB_CODE_TSC_LINE_SYNTAX_ERROR, "Syntax error in Line") From fdee112ae4c970baa6ce83b0b834b1b6a13cb69b Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 5 May 2023 17:31:08 +0800 Subject: [PATCH 07/33] fix:[TS-3277]table not exist if drop stable in another client when insert data using schemaless interface --- source/client/src/clientSml.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 2d86f1989d..2fb20aca1b 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1335,6 +1335,8 @@ static int32_t smlInsertData(SSmlHandle *info) { } taosArrayPush(info->pRequest->tableList, &pName); + tstrncpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName) + 1); + SRequestConnInfo conn = {0}; conn.pTrans = info->taos->pAppInfo->pTransporter; conn.requestId = info->pRequest->requestId; From 29fcd1b83aec652621cb27d2cb4af008ad187d44 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 10:08:05 +0800 Subject: [PATCH 08/33] refactor: do some internal refactor. --- source/dnode/vnode/inc/vnode.h | 9 +- source/dnode/vnode/src/inc/vnodeInt.h | 1 - source/dnode/vnode/src/tq/tq.c | 85 ++------ source/dnode/vnode/src/tq/tqPush.c | 258 +----------------------- source/dnode/vnode/src/tq/tqRead.c | 53 ++--- source/dnode/vnode/src/tq/tqScan.c | 7 +- source/dnode/vnode/src/tq/tqUtil.c | 9 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 1 - source/libs/executor/inc/querytask.h | 2 +- source/libs/executor/src/executor.c | 1 + source/libs/executor/src/scanoperator.c | 9 +- source/libs/stream/src/streamExec.c | 3 +- source/libs/wal/src/walRead.c | 1 + 13 files changed, 58 insertions(+), 381 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c7424cd233..88460cd3ca 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -255,14 +255,13 @@ int32_t tqReaderAddTbUidList(STqReader *pReader, const SArray *pTableUidList); int32_t tqReaderRemoveTbUidList(STqReader *pReader, const SArray *tbUidList); int32_t tqSeekVer(STqReader *pReader, int64_t ver, const char *id); -int32_t tqNextBlock(STqReader *pReader, SSDataBlock* pBlock); int32_t tqNextBlockInWal(STqReader* pReader); -int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); - -int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextBlockImpl(STqReader *pReader); + +int32_t extractSubmitMsgFromWal(SWalReader *pReader, SPackedData *pPackedData); +int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver); bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids); -int32_t tqRetrieveDataBlock(SSDataBlock *pBlock, STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); +int32_t tqRetrieveDataBlock(STqReader *pReader, SSubmitTbData **pSubmitTbDataRet); int32_t tqRetrieveTaosxBlock(STqReader *pReader, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet); int32_t vnodeEnqueueStreamMsg(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index eb2787595b..1aea479511 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -212,7 +212,6 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, int64_t version, char* msg, int32_t msg int32_t tqProcessTaskDropReq(STQ* pTq, int64_t version, char* msg, int32_t msgLen); int32_t tqProcessStreamTaskCheckReq(STQ* pTq, SRpcMsg* pMsg); int32_t tqProcessStreamTaskCheckRsp(STQ* pTq, int64_t version, char* msg, int32_t msgLen); -int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit); int32_t tqProcessSubmitReqForSubscribe(STQ* pTq); int32_t tqProcessDelReq(STQ* pTq, void* pReq, int32_t len, int64_t ver); int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 12b81b6c3f..4997db684f 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -1069,12 +1069,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { int32_t vgId = TD_VID(pTq->pVnode); taosWLockLatch(&pTq->lock); - if(taosHashGetSize(pTq->pPushMgr) > 0){ - void *pIter = taosHashIterate(pTq->pPushMgr, NULL); - while(pIter){ + + if (taosHashGetSize(pTq->pPushMgr) > 0) { + void* pIter = taosHashIterate(pTq->pPushMgr, NULL); + + while (pIter) { STqHandle* pHandle = *(STqHandle**)pIter; - tqDebug("vgId:%d start set submit for pHandle:%p, consume id:0x%"PRIx64, vgId, pHandle, pHandle->consumerId); - if(ASSERT(pHandle->msg != NULL)){ + tqDebug("vgId:%d start set submit for pHandle:%p, consumer:0x%" PRIx64, vgId, pHandle, pHandle->consumerId); + + if (ASSERT(pHandle->msg != NULL)) { tqError("pHandle->msg should not be null"); break; }else{ @@ -1083,77 +1086,15 @@ int32_t tqProcessSubmitReqForSubscribe(STQ* pTq) { taosMemoryFree(pHandle->msg); pHandle->msg = NULL; } + pIter = taosHashIterate(pTq->pPushMgr, pIter); } + taosHashClear(pTq->pPushMgr); } + // unlock taosWUnLockLatch(&pTq->lock); - - return 0; -} - -int32_t tqProcessSubmitReq(STQ* pTq, SPackedData submit) { -#if 0 - void* pIter = NULL; - SStreamDataSubmit2* pSubmit = streamDataSubmitNew(submit, STREAM_INPUT__DATA_SUBMIT); - if (pSubmit == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - tqError("failed to create data submit for stream since out of memory"); - saveOffsetForAllTasks(pTq, submit.ver); - return -1; - } - - SArray* pInputQueueFullTasks = taosArrayInit(4, POINTER_BYTES); - - while (1) { - pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter); - if (pIter == NULL) { - break; - } - - SStreamTask* pTask = *(SStreamTask**)pIter; - if (pTask->taskLevel != TASK_LEVEL__SOURCE) { - continue; - } - - if (pTask->status.taskStatus == TASK_STATUS__RECOVER_PREPARE || pTask->status.taskStatus == TASK_STATUS__WAIT_DOWNSTREAM) { - tqDebug("stream task:%d skip push data, not ready for processing, status %d", pTask->id.taskId, - pTask->status.taskStatus); - continue; - } - - // check if offset value exists - char key[128] = {0}; - createStreamTaskOffsetKey(key, pTask->id.streamId, pTask->id.taskId); - - if (tInputQueueIsFull(pTask)) { - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - - int64_t ver = submit.ver; - if (pOffset == NULL) { - doSaveTaskOffset(pTq->pOffsetStore, key, submit.ver); - } else { - ver = pOffset->val.version; - } - - tqDebug("s-task:%s input queue is full, discard submit block, ver:%" PRId64, pTask->id.idStr, ver); - taosArrayPush(pInputQueueFullTasks, &pTask); - continue; - } - - // check if offset value exists - STqOffset* pOffset = tqOffsetRead(pTq->pOffsetStore, key); - ASSERT(pOffset == NULL); - - addSubmitBlockNLaunchTask(pTq->pOffsetStore, pTask, pSubmit, key, submit.ver); - } - - streamDataSubmitDestroy(pSubmit); - taosFreeQitem(pSubmit); -#endif - - tqStartStreamTasks(pTq); return 0; } @@ -1323,9 +1264,9 @@ FAIL: int32_t tqCheckLogInWal(STQ* pTq, int64_t sversion) { return sversion <= pTq->walLogLastVer; } int32_t tqStartStreamTasks(STQ* pTq) { - int32_t vgId = TD_VID(pTq->pVnode); - + int32_t vgId = TD_VID(pTq->pVnode); SStreamMeta* pMeta = pTq->pStreamMeta; + taosWLockLatch(&pMeta->lock); int32_t numOfTasks = taosHashGetSize(pTq->pStreamMeta->pTasks); if (numOfTasks == 0) { diff --git a/source/dnode/vnode/src/tq/tqPush.c b/source/dnode/vnode/src/tq/tqPush.c index a914517645..c8195f72a9 100644 --- a/source/dnode/vnode/src/tq/tqPush.c +++ b/source/dnode/vnode/src/tq/tqPush.c @@ -16,250 +16,10 @@ #include "tq.h" #include "vnd.h" -#if 0 -void tqTmrRspFunc(void* param, void* tmrId) { - STqHandle* pHandle = (STqHandle*)param; - atomic_store_8(&pHandle->pushHandle.tmrStopped, 1); -} - -static int32_t tqLoopExecFromQueue(STQ* pTq, STqHandle* pHandle, SStreamDataSubmit** ppSubmit, SMqDataRsp* pRsp) { - SStreamDataSubmit* pSubmit = *ppSubmit; - while (pSubmit != NULL) { - if (tqLogScanExec(pTq, &pHandle->execHandle, pSubmit->data, pRsp, 0) < 0) { - } - // update processed - atomic_store_64(&pHandle->pushHandle.processedVer, pSubmit->ver); - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - if (pRsp->blockNum > 0) { - *ppSubmit = pSubmit; - return 0; - } else { - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - } - } - *ppSubmit = pSubmit; - return -1; -} - -int32_t tqExecFromInputQ(STQ* pTq, STqHandle* pHandle) { - SMqDataRsp rsp = {0}; - // 1. guard and set status executing - int8_t execStatus = atomic_val_compare_exchange_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE, - TASK_EXEC_STATUS__EXECUTING); - if (execStatus == TASK_EXEC_STATUS__IDLE) { - SStreamDataSubmit* pSubmit = NULL; - // 2. check processedVer - // 2.1. if not missed, get msg from queue - // 2.2. if missed, scan wal - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - while (pHandle->pushHandle.processedVer <= pSubmit->ver) { - // read from wal - } - while (pHandle->pushHandle.processedVer > pSubmit->ver + 1) { - streamQueueProcessSuccess(&pHandle->pushHandle.inputQ); - streamDataSubmitDestroy(pSubmit); - pSubmit = streamQueueNextItem(&pHandle->pushHandle.inputQ); - if (pSubmit == NULL) break; - } - // 3. exec, after each success, update processed ver - // first run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status closing - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__CLOSING); - // second run - if (tqLoopExecFromQueue(pTq, pHandle, &pSubmit, &rsp) == 0) { - goto SEND_RSP; - } - // set exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - } -SEND_RSP: - // 4. if get result - // 4.1 set exec input status blocked and exec status idle - atomic_store_8(&pHandle->pushHandle.execStatus, TASK_EXEC_STATUS__IDLE); - // 4.2 rpc send - rsp.rspOffset = pHandle->pushHandle.processedVer; - /*if (tqSendPollRsp(pTq, pMsg, pReq, &rsp) < 0) {*/ - /*return -1;*/ - /*}*/ - // 4.3 clear rpc info - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - return 0; -} - -int32_t tqOpenPushHandle(STQ* pTq, STqHandle* pHandle) { - memset(&pHandle->pushHandle, 0, sizeof(STqPushHandle)); - pHandle->pushHandle.inputQ.queue = taosOpenQueue(); - pHandle->pushHandle.inputQ.qall = taosAllocateQall(); - if (pHandle->pushHandle.inputQ.queue == NULL || pHandle->pushHandle.inputQ.qall == NULL) { - if (pHandle->pushHandle.inputQ.queue) { - taosCloseQueue(pHandle->pushHandle.inputQ.queue); - } - if (pHandle->pushHandle.inputQ.qall) { - taosFreeQall(pHandle->pushHandle.inputQ.qall); - } - return -1; - } - return 0; -} - -int32_t tqPreparePush(STQ* pTq, STqHandle* pHandle, int64_t reqId, const SRpcHandleInfo* pInfo, int64_t processedVer, - int64_t timeout) { - memcpy(&pHandle->pushHandle.rpcInfo, pInfo, sizeof(SRpcHandleInfo)); - atomic_store_64(&pHandle->pushHandle.reqId, reqId); - atomic_store_64(&pHandle->pushHandle.processedVer, processedVer); - atomic_store_8(&pHandle->pushHandle.inputStatus, TASK_INPUT_STATUS__NORMAL); - atomic_store_8(&pHandle->pushHandle.tmrStopped, 0); - taosTmrReset(tqTmrRspFunc, (int32_t)timeout, pHandle, tqMgmt.timer, &pHandle->pushHandle.timerId); - return 0; -} - -int32_t tqEnqueue(STqHandle* pHandle, SStreamDataSubmit* pSubmit) { - int8_t inputStatus = atomic_load_8(&pHandle->pushHandle.inputStatus); - if (inputStatus == TASK_INPUT_STATUS__NORMAL) { - SStreamDataSubmit* pSubmitClone = streamSubmitBlockClone(pSubmit); - if (pSubmitClone == NULL) { - return -1; - } - taosWriteQitem(pHandle->pushHandle.inputQ.queue, pSubmitClone); - return 0; - } - return -1; -} - -int32_t tqSendExecReq(STQ* pTq, STqHandle* pHandle) { - // - return 0; -} - -int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver, SRpcHandleInfo handleInfo) { - if (msgType != TDMT_VND_SUBMIT) return 0; - void* pIter = NULL; - STqHandle* pHandle = NULL; - SSubmitReq* pReq = (SSubmitReq*)msg; - int32_t workerId = 4; - int64_t fetchOffset = ver; - - while (1) { - pIter = taosHashIterate(pTq->pushMgr, pIter); - if (pIter == NULL) break; - pHandle = *(STqHandle**)pIter; - - taosWLockLatch(&pHandle->pushHandle.lock); - - SMqDataRsp rsp = {0}; - rsp.reqOffset = pHandle->pushHandle.reqOffset; - rsp.blockData = taosArrayInit(0, sizeof(void*)); - rsp.blockDataLen = taosArrayInit(0, sizeof(int32_t)); - - if (msgType == TDMT_VND_SUBMIT) { - tqLogScanExec(pTq, &pHandle->execHandle, pReq, &rsp, workerId); - } else { - tqError("tq push unexpected msg type %d", msgType); - } - - if (rsp.blockNum == 0) { - taosWUnLockLatch(&pHandle->pushHandle.lock); - continue; - } - - rsp.rspOffset = fetchOffset; - - int32_t tlen = sizeof(SMqRspHead) + tEncodeSMqDataBlkRsp(NULL, &rsp); - void* buf = rpcMallocCont(tlen); - if (buf == NULL) { - // todo free - return -1; - } - - ((SMqRspHead*)buf)->mqMsgType = TMQ_MSG_TYPE__POLL_RSP; - ((SMqRspHead*)buf)->epoch = pHandle->pushHandle.epoch; - ((SMqRspHead*)buf)->consumerId = pHandle->pushHandle.consumerId; - - void* abuf = POINTER_SHIFT(buf, sizeof(SMqRspHead)); - tEncodeSMqDataBlkRsp(&abuf, &rsp); - - SRpcMsg resp = { - .info = pHandle->pushHandle.rpcInfo, - .pCont = buf, - .contLen = tlen, - .code = 0, - }; - tmsgSendRsp(&resp); - - memset(&pHandle->pushHandle.rpcInfo, 0, sizeof(SRpcHandleInfo)); - taosWUnLockLatch(&pHandle->pushHandle.lock); - - tqDebug("vgId:%d offset %" PRId64 " from consumer:%" PRId64 ", (epoch %d) send rsp, block num: %d, req:%" PRId64 ", rsp:%" PRId64, - TD_VID(pTq->pVnode), fetchOffset, pHandle->pushHandle.consumerId, pHandle->pushHandle.epoch, rsp.blockNum, - rsp.reqOffset, rsp.rspOffset); - - // TODO destroy - taosArrayDestroy(rsp.blockData); - taosArrayDestroy(rsp.blockDataLen); - } - - return 0; -} -#endif - int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) { -// void* pReq = POINTER_SHIFT(msg, sizeof(SSubmitReq2Msg)); -// int32_t len = msgLen - sizeof(SSubmitReq2Msg); -// int32_t vgId = TD_VID(pTq->pVnode); if (msgType == TDMT_VND_SUBMIT) { tqProcessSubmitReqForSubscribe(pTq); - // lock push mgr to avoid potential msg lost -// taosWLockLatch(&pTq->lock); -// -// int32_t numOfRegisteredPush = taosHashGetSize(pTq->pPushMgr); -// if (numOfRegisteredPush > 0) { -// tqDebug("vgId:%d tq push msg version:%" PRId64 " type:%s, head:%p, body:%p len:%d, numOfPushed consumers:%d", -// vgId, ver, TMSG_INFO(msgType), msg, pReq, len, numOfRegisteredPush); -// -// void* data = taosMemoryMalloc(len); -// if (data == NULL) { -// terrno = TSDB_CODE_OUT_OF_MEMORY; -// tqError("failed to copy data for stream since out of memory, vgId:%d", vgId); -// taosWUnLockLatch(&pTq->lock); -// return -1; -// } -// -// memcpy(data, pReq, len); -// -// SArray* cachedKey = taosArrayInit(0, sizeof(SItem)); -// void* pIter = NULL; -// -// while (1) { -// pIter = taosHashIterate(pTq->pPushMgr, pIter); -// if (pIter == NULL) { -// break; -// } -// -// STqPushEntry* pPushEntry = *(STqPushEntry**)pIter; -// -// STqHandle* pHandle = taosHashGet(pTq->pHandle, pPushEntry->subKey, strlen(pPushEntry->subKey)); -// if (pHandle == NULL) { -// tqDebug("vgId:%d, failed to find handle %s in pushing data to consumer, ignore", pTq->pVnode->config.vgId, -// pPushEntry->subKey); -// continue; -// } -// -// STqExecHandle* pExec = &pHandle->execHandle; -// doPushDataForEntry(pIter, pExec, pTq, ver, vgId, data, len, cachedKey); -// } -// -// doRemovePushedEntry(cachedKey, pTq); -// taosArrayDestroyEx(cachedKey, freeItem); -// taosMemoryFree(data); -// } -// -// // unlock -// taosWUnLockLatch(&pTq->lock); } tqDebug("handle submit, restore:%d, size:%d", pTq->pVnode->restored, (int)taosHashGetSize(pTq->pStreamMeta->pTasks)); @@ -274,8 +34,7 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v } if (msgType == TDMT_VND_SUBMIT) { - SPackedData submit = {0}; - tqProcessSubmitReq(pTq, submit); + tqStartStreamTasks(pTq); } if (msgType == TDMT_VND_DELETE) { @@ -286,16 +45,16 @@ int32_t tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t v return 0; } - int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { int32_t vgId = TD_VID(pTq->pVnode); - STqHandle* pHandle = (STqHandle*) handle; - if(pHandle->msg == NULL){ + STqHandle* pHandle = (STqHandle*)handle; + + if (pHandle->msg == NULL) { pHandle->msg = taosMemoryCalloc(1, sizeof(SRpcMsg)); memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = rpcMallocCont(pMsg->contLen); - }else{ - void *tmp = pHandle->msg->pCont; + } else { + void* tmp = pHandle->msg->pCont; memcpy(pHandle->msg, pMsg, sizeof(SRpcMsg)); pHandle->msg->pCont = tmp; } @@ -303,7 +62,8 @@ int32_t tqRegisterPushHandle(STQ* pTq, void* handle, SRpcMsg* pMsg) { memcpy(pHandle->msg->pCont, pMsg->pCont, pMsg->contLen); pHandle->msg->contLen = pMsg->contLen; int32_t ret = taosHashPut(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey), &pHandle, POINTER_BYTES); - tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); + tqDebug("vgId:%d data is over, ret:%d, consumerId:0x%" PRIx64 ", register to pHandle:%p, pCont:%p, len:%d", vgId, ret, + pHandle->consumerId, pHandle, pHandle->msg->pCont, pHandle->msg->contLen); return 0; } @@ -313,6 +73,7 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { int32_t ret = taosHashRemove(pTq->pPushMgr, pHandle->subKey, strlen(pHandle->subKey)); tqError("vgId:%d remove pHandle:%p,ret:%d consumer Id:0x%" PRIx64, vgId, pHandle, ret, pHandle->consumerId); + if(pHandle->msg != NULL) { tqPushDataRsp(pTq, pHandle); @@ -320,5 +81,6 @@ int32_t tqUnregisterPushHandle(STQ* pTq, void *handle) { taosMemoryFree(pHandle->msg); pHandle->msg = NULL; } + return 0; } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 082e31ea91..7ed77edd5b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -332,6 +332,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { if (pBlockList == NULL || pReader->nextBlk >= taosArrayGetSize(pBlockList)) { // try next message in wal file + // todo always retry to avoid read failure caused by wal file deletion if (walNextValidMsg(pWalReader) < 0) { return FETCH_TYPE__NONE; } @@ -374,7 +375,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pReader->tbIdHash == NULL) { - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); + int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { return FETCH_TYPE__DATA; } @@ -384,7 +385,7 @@ int32_t tqNextBlockInWal(STqReader* pReader) { if (ret != NULL) { tqDebug("tq reader return submit block, uid:%"PRId64", ver:%"PRId64, pSubmitTbData->uid, pReader->msg.ver); - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); + int32_t code = tqRetrieveDataBlock(pReader, NULL); if (code == TSDB_CODE_SUCCESS && pReader->pResBlock->info.rows > 0) { return FETCH_TYPE__DATA; } @@ -399,31 +400,6 @@ int32_t tqNextBlockInWal(STqReader* pReader) { } } -int32_t tqNextBlock(STqReader* pReader, SSDataBlock* pBlock) { - while (1) { - if (pReader->msg.msgStr == NULL) { - if (walNextValidMsg(pReader->pWalReader) < 0) { - return FETCH_TYPE__NONE; - } - - void* pBody = POINTER_SHIFT(pReader->pWalReader->pHead->head.body, sizeof(SSubmitReq2Msg)); - int32_t bodyLen = pReader->pWalReader->pHead->head.bodyLen - sizeof(SSubmitReq2Msg); - int64_t ver = pReader->pWalReader->pHead->head.version; - - tqReaderSetSubmitMsg(pReader, pBody, bodyLen, ver); - } - - while (tqNextBlockImpl(pReader)) { - int32_t code = tqRetrieveDataBlock(pReader->pResBlock, pReader, NULL); - if (code != TSDB_CODE_SUCCESS || pBlock->info.rows == 0) { - continue; - } - - return FETCH_TYPE__DATA; - } - } -} - int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { pReader->msg.msgStr = msgStr; pReader->msg.msgLen = msgLen; @@ -527,7 +503,7 @@ int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrap return 0; } -int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { +int32_t tqRetrieveDataBlock(STqReader* pReader, SSubmitTbData** pSubmitTbDataRet) { tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk++); @@ -535,6 +511,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa *pSubmitTbDataRet = pSubmitTbData; } + SSDataBlock* pBlock = pReader->pResBlock; blockDataCleanup(pBlock); int32_t sversion = pSubmitTbData->sver; @@ -603,7 +580,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa SColumnInfoData colInfo = createColumnInfoData(pColSchema->type, pColSchema->bytes, pColSchema->colId); int32_t code = blockDataAppendColInfo(pBlock, &colInfo); if (code != TSDB_CODE_SUCCESS) { - goto FAIL; + return -1; } i++; j++; @@ -622,7 +599,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa if (blockDataEnsureCapacity(pBlock, numOfRows) < 0) { terrno = TSDB_CODE_OUT_OF_MEMORY; - goto FAIL; + return -1; } pBlock->info.rows = numOfRows; @@ -638,7 +615,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa while (targetIdx < colActual) { if (sourceIdx >= numOfCols) { tqError("tqRetrieveDataBlock sourceIdx:%d >= numOfCols:%d", sourceIdx, numOfCols); - goto FAIL; + return -1; } SColData* pCol = taosArrayGet(pCols, sourceIdx); @@ -647,7 +624,7 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa if (pCol->nVal != numOfRows) { tqError("tqRetrieveDataBlock pCol->nVal:%d != numOfRows:%d", pCol->nVal, numOfRows); - goto FAIL; + return -1; } if (pCol->cid < pColData->info.colId) { @@ -661,14 +638,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } else { colDataSetNULL(pColData, i); } } else { if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } } @@ -710,14 +687,14 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa memcpy(varDataVal(val), colVal.value.pData, colVal.value.nData); varDataSetLen(val, colVal.value.nData); if (colDataAppend(pColData, i, val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } else { colDataSetNULL(pColData, i); } } else { if (colDataAppend(pColData, i, (void*)&colVal.value.val, !COL_VAL_IS_VALUE(&colVal)) < 0) { - goto FAIL; + return -1; } } @@ -735,10 +712,6 @@ int32_t tqRetrieveDataBlock(SSDataBlock* pBlock, STqReader* pReader, SSubmitTbDa } return 0; - -FAIL: - blockDataFreeRes(pBlock); - return -1; } int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet) { diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 3d9cea54ba..800bcc8b71 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -66,9 +66,10 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, STaosxRsp* pRsp, in int32_t tqScanData(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset) { const int32_t MAX_ROWS_TO_RETURN = 4096; - int32_t vgId = TD_VID(pTq->pVnode); - int32_t code = 0; - int32_t totalRows = 0; + + int32_t vgId = TD_VID(pTq->pVnode); + int32_t code = 0; + int32_t totalRows = 0; const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 133c51a8dc..d83345ad59 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -175,7 +175,7 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, goto end; } -// till now, all data has been transferred to consumer, new data needs to push client once arrived. + // till now, all data has been transferred to consumer, new data needs to push client once arrived. if (dataRsp.blockNum == 0 && dataRsp.reqOffset.type == TMQ_OFFSET__LOG && dataRsp.reqOffset.version == dataRsp.rspOffset.version && pHandle->consumerId == pRequest->consumerId) { // lock @@ -361,11 +361,10 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ // this is a normal subscribe requirement if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { return extractDataAndRspForNormalSubscribe(pTq, pHandle, pRequest, pMsg, &offset); + } else { // todo handle the case where re-balance occurs. + // for taosx + return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } - - // todo handle the case where re-balance occurs. - // for taosx - return extractDataAndRspForDbStbSubscribe(pTq, pHandle, pRequest, pMsg, &offset); } int32_t tqSendMetaPollRsp(STQ* pTq, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqMetaRsp* pRsp) { diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f8161427db..c608403456 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -448,7 +448,6 @@ int32_t vnodeProcessWriteMsg(SVnode *pVnode, SRpcMsg *pMsg, int64_t version, SRp walApplyVer(pVnode->pWal, version); if (tqPushMsg(pVnode->pTq, pMsg->pCont, pMsg->contLen, pMsg->msgType, version) < 0) { -// /*vInfo("vgId:%d, push msg end", pVnode->config.vgId);*/ vError("vgId:%d, failed to push msg to TQ since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } diff --git a/source/libs/executor/inc/querytask.h b/source/libs/executor/inc/querytask.h index 8852265da0..37c93fef5c 100644 --- a/source/libs/executor/inc/querytask.h +++ b/source/libs/executor/inc/querytask.h @@ -59,7 +59,7 @@ typedef struct { STqOffsetVal currentOffset; // for tmq SMqMetaRsp metaRsp; // for tmq fetching meta int64_t snapshotVer; - SPackedData submit; // todo remove it +// SPackedData submit; // todo remove it SSchemaWrapper* schema; char tbName[TSDB_TABLE_NAME_LEN]; // this is the current scan table: todo refactor int8_t recoverStep; diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5fc079b7c1..2d5830e4a9 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1080,6 +1080,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT STableListInfo* pTableListInfo = pScanBaseInfo->pTableListInfo; if (pOffset->type == TMQ_OFFSET__LOG) { + // todo refactor: move away tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c index 9b1b5235cc..7cb3c00c1a 100644 --- a/source/libs/executor/src/scanoperator.c +++ b/source/libs/executor/src/scanoperator.c @@ -1636,6 +1636,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { qDebug("start to exec queue scan, %s", id); +#if 0 if (pTaskInfo->streamInfo.submit.msgStr != NULL) { if (pInfo->tqReader->msg.msgStr == NULL) { SPackedData submit = pTaskInfo->streamInfo.submit; @@ -1649,7 +1650,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { SDataBlockInfo* pBlockInfo = &pInfo->pRes->info; while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -1665,6 +1666,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { pTaskInfo->streamInfo.submit = (SPackedData){0}; return NULL; } +#endif if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__SNAPSHOT_DATA) { SSDataBlock* pResult = doTableScan(pInfo->pTableScanOp); @@ -1682,10 +1684,12 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) { if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1, pTaskInfo->id.str) < 0) { return NULL; } + tqOffsetResetToLog(&pTaskInfo->streamInfo.currentOffset, pTaskInfo->streamInfo.snapshotVer); } if (pTaskInfo->streamInfo.currentOffset.type == TMQ_OFFSET__LOG) { + while (1) { int32_t type = tqNextBlockInWal(pInfo->tqReader); SSDataBlock* pRes = pInfo->tqReader->pResBlock; @@ -2071,7 +2075,7 @@ FETCH_NEXT_BLOCK: blockDataCleanup(pInfo->pRes); while (tqNextBlockImpl(pInfo->tqReader)) { - int32_t code = tqRetrieveDataBlock(pInfo->tqReader->pResBlock, pInfo->tqReader, NULL); + int32_t code = tqRetrieveDataBlock(pInfo->tqReader, NULL); if (code != TSDB_CODE_SUCCESS || pInfo->tqReader->pResBlock->info.rows == 0) { continue; } @@ -2109,7 +2113,6 @@ FETCH_NEXT_BLOCK: // record the scan action. pInfo->numOfExec++; pOperator->resultInfo.totalRows += pBlockInfo->rows; - // printDataBlock(pInfo->pRes, "stream scan"); qDebug("scan rows: %" PRId64, pBlockInfo->rows); if (pBlockInfo->rows > 0) { diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f33e126068..f79d84c371 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -253,7 +253,6 @@ int32_t streamExecForAll(SStreamTask* pTask) { while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { -// qDebug("s-task:%s extract data from input queue, queue is empty, abort", pTask->id.idStr); break; } @@ -298,7 +297,7 @@ int32_t streamExecForAll(SStreamTask* pTask) { } SArray* pRes = taosArrayInit(0, sizeof(SSDataBlock)); - qDebug("s-task:%s exec begin, numOfBlocks:%d", pTask->id.idStr, batchSize); + qDebug("s-task:%s start to execute, numOfBlocks:%d", pTask->id.idStr, batchSize); streamTaskExecImpl(pTask, pInput, pRes); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 6154e30938..4cc43a19a0 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -237,6 +237,7 @@ static int32_t walFetchHeadNew(SWalReader *pRead, int64_t fetchVer) { } seeked = true; } + while (1) { contLen = taosReadFile(pRead->pLogFile, pRead->pHead, sizeof(SWalCkHead)); if (contLen == sizeof(SWalCkHead)) { From 3afc5bdfcf0811468ad387d655314ddde5d37934 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Sat, 6 May 2023 10:09:51 +0800 Subject: [PATCH 09/33] refactor: add some logs. --- source/libs/stream/src/streamExec.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index f79d84c371..e85a552d13 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -250,6 +250,8 @@ int32_t streamExecForAll(SStreamTask* pTask) { void* pInput = NULL; // merge multiple input data if possible in the input queue. + qDebug("s-task:%s start to extract data block from inputQ", pTask->id.idStr); + while (1) { SStreamQueueItem* qItem = streamQueueNextItem(pTask->inputQueue); if (qItem == NULL) { From be7aacc43f2dd5042e295981fca023e680183c52 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 6 May 2023 10:24:42 +0800 Subject: [PATCH 10/33] fix: print float with 20 width limit --- tools/shell/src/shellEngine.c | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tools/shell/src/shellEngine.c b/tools/shell/src/shellEngine.c index 5ac32eaad9..0f91bdeeda 100644 --- a/tools/shell/src/shellEngine.c +++ b/tools/shell/src/shellEngine.c @@ -554,7 +554,12 @@ void shellPrintField(const char *val, TAOS_FIELD *field, int32_t width, int32_t if (tsEnableScience) { printf("%*e", width, GET_FLOAT_VAL(val)); } else { - printf("%*.5f", width, GET_FLOAT_VAL(val)); + n = snprintf(buf, TSDB_MAX_BYTES_PER_ROW, "%*.5f", width, GET_FLOAT_VAL(val)); + if (n > TMAX(20, width)) { + printf("%*e", width, GET_FLOAT_VAL(val)); + } else { + printf("%s", buf); + } } break; case TSDB_DATA_TYPE_DOUBLE: From 55eddbfb5eadf004edb519510f7271144baaa4df Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 10:54:05 +0800 Subject: [PATCH 11/33] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/mnode/impl/src/mndSubscribe.c | 42 ++++++++------ source/dnode/vnode/src/tq/tq.c | 64 ++++++++++------------ 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 573c60549e..75bc595a2e 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -269,6 +269,18 @@ static void addUnassignedVgroups(SMqRebOutputObj *pOutput, SHashObj *pHash) { } } +static void putNoTransferToOutput(SMqRebOutputObj *pOutput, SMqConsumerEp *pConsumerEp){ + for(int i = 0; i < taosArrayGetSize(pConsumerEp->vgs); i++){ + SMqVgEp *pVgEp = (SMqVgEp *)taosArrayGetP(pConsumerEp->vgs, i); + SMqRebOutputVg outputVg = { + .oldConsumerId = pConsumerEp->consumerId, + .newConsumerId = pConsumerEp->consumerId, + .pVgEp = pVgEp, + }; + taosArrayPush(pOutput->rebVgs, &outputVg); + } +} + static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHash, int32_t minVgCnt, int32_t imbConsumerNum) { const char *pSubKey = pOutput->pSub->key; @@ -290,24 +302,19 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas taosArrayPush(pOutput->modifyConsumers, &pConsumerEp->consumerId); if (consumerVgNum > minVgCnt) { if (imbCnt < imbConsumerNum) { - if (consumerVgNum == minVgCnt + 1) { - imbCnt++; - continue; - } else { - // pop until equal minVg + 1 - while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { - SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); - SMqRebOutputVg outputVg = { - .oldConsumerId = pConsumerEp->consumerId, - .newConsumerId = -1, - .pVgEp = pVgEp, - }; - taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); - mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, - pConsumerEp->consumerId); - } - imbCnt++; + // pop until equal minVg + 1 + while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt + 1) { + SMqVgEp *pVgEp = *(SMqVgEp **)taosArrayPop(pConsumerEp->vgs); + SMqRebOutputVg outputVg = { + .oldConsumerId = pConsumerEp->consumerId, + .newConsumerId = -1, + .pVgEp = pVgEp, + }; + taosHashPut(pHash, &pVgEp->vgId, sizeof(int32_t), &outputVg, sizeof(SMqRebOutputVg)); + mInfo("sub:%s mq rebalance remove vgId:%d from consumer:0x%" PRIx64 ",(first scan)", pSubKey, pVgEp->vgId, + pConsumerEp->consumerId); } + imbCnt++; } else { // all the remain consumers should only have the number of vgroups, which is equalled to the value of minVg while (taosArrayGetSize(pConsumerEp->vgs) > minVgCnt) { @@ -323,6 +330,7 @@ static void transferVgroupsForConsumers(SMqRebOutputObj *pOutput, SHashObj *pHas } } } + putNoTransferToOutput(pOutput, pConsumerEp); } } diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 792ff8677e..7004fe0be3 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -445,6 +445,7 @@ int32_t tqProcessDelCheckInfoReq(STQ* pTq, int64_t sversion, char* msg, int32_t } int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msgLen) { + int ret = 0; SMqRebVgReq req = {0}; tDecodeSMqRebVgReq(msg, &req); @@ -463,8 +464,7 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg if (req.newConsumerId == -1) { tqError("vgId:%d, tq invalid re-balance request, new consumerId %" PRId64 "", req.vgId, req.newConsumerId); - taosMemoryFree(req.qmsg); - return 0; + goto end; } STqHandle tqHandle = {0}; @@ -481,8 +481,8 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg // TODO version should be assigned and refed during preprocess SWalRef* pRef = walRefCommittedVer(pVnode->pWal); if (pRef == NULL) { - taosMemoryFree(req.qmsg); - return -1; + ret = -1; + goto end; } int64_t ver = pRef->refVer; @@ -534,49 +534,41 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg taosHashPut(pTq->pHandle, req.subKey, strlen(req.subKey), pHandle, sizeof(STqHandle)); tqDebug("try to persist handle %s consumer:0x%" PRIx64 " , old consumer:0x%" PRIx64, req.subKey, pHandle->consumerId, oldConsumerId); - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - taosMemoryFree(req.qmsg); - return -1; - } + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + goto end; } else { if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); - atomic_store_32(&pHandle->epoch, -1); - atomic_add_fetch_32(&pHandle->epoch, 1); - taosMemoryFree(req.qmsg); - return tqMetaSaveHandle(pTq, req.subKey, pHandle); } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); - - // kill executing task - qTaskInfo_t pTaskInfo = pHandle->execHandle.task; - if (pTaskInfo != NULL) { - qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); - } - - taosWLockLatch(&pTq->lock); - atomic_store_32(&pHandle->epoch, 0); - - // remove if it has been register in the push manager, and return one empty block to consumer - tqUnregisterPushHandle(pTq, pHandle); - atomic_store_64(&pHandle->consumerId, req.newConsumerId); - - if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { - qStreamCloseTsdbReader(pTaskInfo); - } - - taosWUnLockLatch(&pTq->lock); - if (tqMetaSaveHandle(pTq, req.subKey, pHandle) < 0) { - taosMemoryFree(req.qmsg); - return -1; - } } + // kill executing task + qTaskInfo_t pTaskInfo = pHandle->execHandle.task; + if (pTaskInfo != NULL) { + qKillTask(pTaskInfo, TSDB_CODE_SUCCESS); + } + + taosWLockLatch(&pTq->lock); + atomic_add_fetch_32(&pHandle->epoch, 1); + + // remove if it has been register in the push manager, and return one empty block to consumer + tqUnregisterPushHandle(pTq, pHandle); + + + if (pHandle->execHandle.subType == TOPIC_SUB_TYPE__COLUMN) { + qStreamCloseTsdbReader(pTaskInfo); + } + + taosWUnLockLatch(&pTq->lock); + ret = tqMetaSaveHandle(pTq, req.subKey, pHandle); + goto end; } +end: taosMemoryFree(req.qmsg); - return 0; + return ret; } int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask, int64_t ver) { From 446097434e5a288929fe8264ac9a56c6294fc06c Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 11:06:41 +0800 Subject: [PATCH 12/33] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/vnode/src/tq/tqUtil.c | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index d186c63871..57fd271416 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -165,12 +165,19 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, SRpcMsg* pMsg, STqOffsetVal* pOffset) { uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); + int code = 0; SMqDataRsp dataRsp = {0}; tqInitDataRsp(&dataRsp, pRequest, pHandle->execHandle.subType); + qTaskInfo_t task = pHandle->execHandle.task; + if(qTaskIsExecuting(task)){ + code = tqSendDataRsp(pTq, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_RSP); + tDeleteSMqDataRsp(&dataRsp); + return code; + } qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - int code = tqScanData(pTq, pHandle, &dataRsp, pOffset); + code = tqScanData(pTq, pHandle, &dataRsp, pOffset); if(code != 0) { goto end; } From d32097d56a9eeb211befff2d66d34248fa10143a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 11:26:54 +0800 Subject: [PATCH 13/33] fix:disable subscribeStb.py temporary --- tests/system-test/win-test-file | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 7e68c40fd8..214e01f1a8 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb3.py python3 ./test.py -f 7-tmq/subscribeDb4.py -python3 ./test.py -f 7-tmq/subscribeStb.py +#python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py python3 ./test.py -f 7-tmq/subscribeStb2.py From 6b0580eedfd5a4d0cb9393b092bb40191afd0938 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 6 May 2023 13:30:49 +0800 Subject: [PATCH 14/33] fix: clear null bitmap bit when set value --- source/common/src/tdatablock.c | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index 3558feaa66..b439f0437c 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -120,6 +120,7 @@ int32_t colDataSetVal(SColumnInfoData* pColumnInfoData, uint32_t rowIndex, const pColumnInfoData->varmeta.length += dataLen; } else { memcpy(pColumnInfoData->pData + pColumnInfoData->info.bytes * rowIndex, pData, pColumnInfoData->info.bytes); + colDataClearNull_f(pColumnInfoData->nullbitmap, rowIndex); } return 0; @@ -1857,7 +1858,6 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } -#if 0 void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*)); taosArrayPush(dataBlocks, &pBlock); @@ -1950,11 +1950,10 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { } } -#endif // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { - int32_t size = 2048; + int32_t size = 2048*1024; *pDataBuf = taosMemoryCalloc(size, 1); char* dumpBuf = *pDataBuf; char pBuf[128] = {0}; @@ -1970,7 +1969,7 @@ char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) if (len >= size - 1) return dumpBuf; for (int32_t j = 0; j < rows; j++) { - len += snprintf(dumpBuf + len, size - len, "%s |", flag); + len += snprintf(dumpBuf + len, size - len, "%s %d|", flag, j); if (len >= size - 1) return dumpBuf; for (int32_t k = 0; k < colNum; k++) { From 25170e1a75f0bfed8327ab0eb5f9c51323ce491a Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 6 May 2023 13:32:58 +0800 Subject: [PATCH 15/33] fix: restore the removal of show datablock to console --- source/common/src/tdatablock.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/common/src/tdatablock.c b/source/common/src/tdatablock.c index b439f0437c..9c56dd0439 100644 --- a/source/common/src/tdatablock.c +++ b/source/common/src/tdatablock.c @@ -1858,6 +1858,7 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) { return buf; } +#if 0 void blockDebugShowDataBlock(SSDataBlock* pBlock, const char* flag) { SArray* dataBlocks = taosArrayInit(1, sizeof(SSDataBlock*)); taosArrayPush(dataBlocks, &pBlock); @@ -1949,7 +1950,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) { } } } - +#endif // for debug char* dumpBlockData(SSDataBlock* pDataBlock, const char* flag, char** pDataBuf) { From 33d4e0cc677bb9f300a720f2d8f5ed364b47e80b Mon Sep 17 00:00:00 2001 From: Ganlin Zhao Date: Sat, 6 May 2023 15:03:17 +0800 Subject: [PATCH 16/33] fix: fix taosd not exit when press ctrl+c --- source/libs/executor/src/executorInt.c | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/source/libs/executor/src/executorInt.c b/source/libs/executor/src/executorInt.c index f525f6728c..62ab2d9df2 100644 --- a/source/libs/executor/src/executorInt.c +++ b/source/libs/executor/src/executorInt.c @@ -82,7 +82,7 @@ static void extractQualifiedTupleByFilterResult(SSDataBlock* pBlock, const SC static int32_t doSetInputDataBlock(SExprSupp* pExprSup, SSDataBlock* pBlock, int32_t order, int32_t scanFlag, bool createDummyCol); static int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo); + SGroupResInfo* pGroupResInfo, int32_t threshold); SResultRow* getNewResultRow(SDiskbasedBuf* pResultBuf, int32_t* currentPageId, int32_t interBufSize) { SFilePage* pData = NULL; @@ -776,7 +776,7 @@ int32_t finalizeResultRows(SDiskbasedBuf* pBuf, SResultRowPosition* resultRowPos } int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprSupp* pSup, SDiskbasedBuf* pBuf, - SGroupResInfo* pGroupResInfo) { + SGroupResInfo* pGroupResInfo, int32_t threshold) { SExprInfo* pExprInfo = pSup->pExprInfo; int32_t numOfExprs = pSup->numOfExprs; int32_t* rowEntryOffset = pSup->rowEntryInfoOffset; @@ -825,6 +825,9 @@ int32_t doCopyToSDataBlock(SExecTaskInfo* pTaskInfo, SSDataBlock* pBlock, SExprS releaseBufPage(pBuf, page); pBlock->info.rows += pRow->numOfRows; + if (pBlock->info.rows >= threshold) { + break; + } } qDebug("%s result generated, rows:%" PRId64 ", groupId:%" PRIu64, GET_TASKID(pTaskInfo), pBlock->info.rows, @@ -850,7 +853,7 @@ void doBuildStreamResBlock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SGr // clear the existed group id pBlock->info.id.groupId = 0; ASSERT(!pbInfo->mergeResultBlock); - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); void* tbname = NULL; if (streamStateGetParName(pTaskInfo->streamInfo.pState, pBlock->info.id.groupId, &tbname) < 0) { @@ -877,10 +880,10 @@ void doBuildResultDatablock(SOperatorInfo* pOperator, SOptrBasicInfo* pbInfo, SG // clear the existed group id pBlock->info.id.groupId = 0; if (!pbInfo->mergeResultBlock) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); } else { while (hasRemainResults(pGroupResInfo)) { - doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo); + doCopyToSDataBlock(pTaskInfo, pBlock, &pOperator->exprSupp, pBuf, pGroupResInfo, pOperator->resultInfo.threshold); if (pBlock->info.rows >= pOperator->resultInfo.threshold) { break; } From fc799bcd19c33939d172172de03e7abce403d2a6 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 6 May 2023 15:06:53 +0800 Subject: [PATCH 17/33] test: fix tsim/db/error1.sim random failed --- tests/script/tsim/db/error1.sim | 18 +++++++++--------- 1 file changed, 9 insertions(+), 9 deletions(-) diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index 32dbe826cc..d275dca387 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -56,18 +56,18 @@ endi if $data23 != 0 then return -1 -endi +end -print ========== stop dnode2 -system sh/exec.sh -n dnode2 -s stop -x SIGKILL +#print ========== stop dnode2 +#system sh/exec.sh -n dnode2 -s stop -x SIGKILL -sleep 1000 -print =============== drop database -sql_error drop database d1 +#sleep 1000 +#print =============== drop database +sql drop database d1 -print ========== start dnode2 -system sh/exec.sh -n dnode2 -s start -sleep 1000 +#print ========== start dnode2 +#system sh/exec.sh -n dnode2 -s start +#sleep 1000 print =============== re-create database $x = 0 From b925342ad06a1d0433b19afb5eacd741957001d1 Mon Sep 17 00:00:00 2001 From: Alex Duan <417921451@qq.com> Date: Sat, 6 May 2023 15:08:33 +0800 Subject: [PATCH 18/33] test: fix tsim/db/error1.sim random failed --- tests/script/tsim/db/error1.sim | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/script/tsim/db/error1.sim b/tests/script/tsim/db/error1.sim index d275dca387..64b17125aa 100644 --- a/tests/script/tsim/db/error1.sim +++ b/tests/script/tsim/db/error1.sim @@ -56,7 +56,7 @@ endi if $data23 != 0 then return -1 -end +endi #print ========== stop dnode2 #system sh/exec.sh -n dnode2 -s stop -x SIGKILL From c8ac3a581ec53a26f6edb35bddd1fd6cc7f91b54 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 6 May 2023 15:50:38 +0800 Subject: [PATCH 19/33] fix: forbid some stream computing --- source/libs/parser/src/parTranslater.c | 17 ++++++++++++++++- source/libs/parser/test/parInitialCTest.cpp | 4 ++++ 2 files changed, 20 insertions(+), 1 deletion(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index b598fffbc6..3b8dd80f55 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6118,17 +6118,32 @@ static bool isEventWindowQuery(SSelectStmt* pSelect) { return NULL != pSelect->pWindow && QUERY_NODE_EVENT_WINDOW == nodeType(pSelect->pWindow); } +static bool hasJsonTypeProjection(SSelectStmt* pSelect) { + SNode* pProj = NULL; + FOREACH(pProj, pSelect->pProjectionList) { + if (TSDB_DATA_TYPE_JSON == ((SExprNode*)pProj)->resType.type) { + return true; + } + } + return false; +} + static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || !pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect) || NULL != pSelect->pOrderByList || - crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect)) { + crossTableWithUdaf(pSelect) || isEventWindowQuery(pSelect) || hasJsonTypeProjection(pSelect)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); } if (NULL != pSelect->pSubtable && TSDB_DATA_TYPE_VARCHAR != ((SExprNode*)pSelect->pSubtable)->resType.type) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "SUBTABLE expression must be of VARCHAR type"); } + if (NULL != pSelect->pSubtable && NULL == pSelect->pPartitionByList && nodesExprHasColumn(pSelect->pSubtable)) { + return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, + "SUBTABLE expression must not has column when no partition by clause"); + } + if (NULL == pSelect->pWindow && STREAM_TRIGGER_AT_ONCE != pStmt->pOptions->triggerType) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "The trigger mode of non window query can only be AT_ONCE"); diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index b7ca944ebb..d79ad04c74 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -920,6 +920,10 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) { run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC); + run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json("{c1:1}") FROM st1 PARTITION BY TBNAME", + TSDB_CODE_PAR_INVALID_STREAM_QUERY); + run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) " + "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY); } /* From 83c857a017eaec512c2afaa402c34887cf793159 Mon Sep 17 00:00:00 2001 From: shenglian zhou Date: Sat, 6 May 2023 16:17:34 +0800 Subject: [PATCH 20/33] fix: fix compilation error --- source/libs/parser/test/parInitialCTest.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index d79ad04c74..902a2eaf85 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -920,7 +920,7 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) { run("CREATE STREAM s1 INTO st1 AS SELECT PERCENTILE(c1, 30) FROM t1 INTERVAL(10S)", TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC); - run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json("{c1:1}") FROM st1 PARTITION BY TBNAME", + run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME", TSDB_CODE_PAR_INVALID_STREAM_QUERY); run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) " "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY); From ee32620808441e640ac22b5d65769a8323553754 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 16:22:30 +0800 Subject: [PATCH 21/33] fix:[TS-3347]set ver to first version if version stored is smaller than first version in wal when subscribe db --- include/libs/executor/executor.h | 2 ++ source/dnode/vnode/src/tq/tqUtil.c | 1 + source/libs/executor/src/executor.c | 15 +++++++++------ 3 files changed, 12 insertions(+), 6 deletions(-) diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index 1fb00e743f..b7e6c42e3b 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -190,6 +190,8 @@ STimeWindow getAlignQueryTimeWindow(SInterval* pInterval, int32_t precision, int SArray* qGetQueriedTableListInfo(qTaskInfo_t tinfo); +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset); + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType); void qStreamSetOpen(qTaskInfo_t tinfo); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 133c51a8dc..dba363122a 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -246,6 +246,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (offset->type == TMQ_OFFSET__LOG) { + verifyOffset(pHandle->pWalReader, offset); int64_t fetchVer = offset->version + 1; pCkHead = taosMemoryMalloc(sizeof(SWalCkHead) + 2048); if (pCkHead == NULL) { diff --git a/source/libs/executor/src/executor.c b/source/libs/executor/src/executor.c index 5fc079b7c1..1c87619e84 100644 --- a/source/libs/executor/src/executor.c +++ b/source/libs/executor/src/executor.c @@ -1058,6 +1058,14 @@ void qStreamSetOpen(qTaskInfo_t tinfo) { pOperator->status = OP_NOT_OPENED; } +void verifyOffset(void *pWalReader, STqOffsetVal* pOffset){ + // if offset version is small than first version , let's seek to first version + int64_t firstVer = walGetFirstVer(((SWalReader*)pWalReader)->pWal); + if (pOffset->version + 1 < firstVer){ + pOffset->version = firstVer - 1; + } +} + int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subType) { SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo; SOperatorInfo* pOperator = pTaskInfo->pRoot; @@ -1083,12 +1091,7 @@ int32_t qStreamPrepareScan(qTaskInfo_t tinfo, STqOffsetVal* pOffset, int8_t subT tsdbReaderClose(pScanBaseInfo->dataReader); pScanBaseInfo->dataReader = NULL; - // let's seek to the next version in wal file - int64_t firstVer = walGetFirstVer(pInfo->tqReader->pWalReader->pWal); - if (pOffset->version + 1 < firstVer){ - pOffset->version = firstVer - 1; - } - + verifyOffset(pInfo->tqReader->pWalReader, pOffset); if (tqSeekVer(pInfo->tqReader, pOffset->version + 1, id) < 0) { qError("tqSeekVer failed ver:%" PRId64 ", %s", pOffset->version + 1, id); return -1; From 276e5daa8cd909db51aec77630a0c72f621422db Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 6 May 2023 17:13:14 +0800 Subject: [PATCH 22/33] fix: pseudo column function are treated as variable --- source/libs/parser/src/parTranslater.c | 20 +++++++++++++++++++- source/libs/parser/test/parInitialCTest.cpp | 2 +- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 3b8dd80f55..492b2a5349 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6128,6 +6128,24 @@ static bool hasJsonTypeProjection(SSelectStmt* pSelect) { return false; } +static EDealRes hasVariable(SNode* pNode, void* pContext) { + if (QUERY_NODE_COLUMN == nodeType(pNode)) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + if (QUERY_NODE_FUNCTION == nodeType(pNode) && fmIsPseudoColumnFunc(((SFunctionNode*)pNode)->funcId)) { + *(bool*)pContext = true; + return DEAL_RES_END; + } + return DEAL_RES_CONTINUE; +} + +static int32_t subtableExprHasVariable(SNode* pNode) { + bool hasCol = false; + nodesWalkExprPostOrder(pNode, hasVariable, &hasCol); + return hasCol; +} + static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type || @@ -6139,7 +6157,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "SUBTABLE expression must be of VARCHAR type"); } - if (NULL != pSelect->pSubtable && NULL == pSelect->pPartitionByList && nodesExprHasColumn(pSelect->pSubtable)) { + if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasVariable(pSelect->pSubtable)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "SUBTABLE expression must not has column when no partition by clause"); } diff --git a/source/libs/parser/test/parInitialCTest.cpp b/source/libs/parser/test/parInitialCTest.cpp index 902a2eaf85..6a08193a39 100644 --- a/source/libs/parser/test/parInitialCTest.cpp +++ b/source/libs/parser/test/parInitialCTest.cpp @@ -922,7 +922,7 @@ TEST_F(ParserInitialCTest, createStreamSemanticCheck) { TSDB_CODE_PAR_STREAM_NOT_ALLOWED_FUNC); run("CREATE STREAM s2 INTO st1 AS SELECT ts, to_json('{c1:1}') FROM st1 PARTITION BY TBNAME", TSDB_CODE_PAR_INVALID_STREAM_QUERY); - run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tname)) " + run("CREATE STREAM s3 INTO st3 TAGS(tname VARCHAR(10), id INT) SUBTABLE(CONCAT('new-', tbname)) " "AS SELECT _WSTART wstart, COUNT(*) cnt FROM st1 INTERVAL(10S)", TSDB_CODE_PAR_INVALID_STREAM_QUERY); } From 04e9648d8ebab33db928740e9ecd6f8f0a455621 Mon Sep 17 00:00:00 2001 From: slzhou Date: Sat, 6 May 2023 17:16:27 +0800 Subject: [PATCH 23/33] enhance: change variable name --- source/libs/parser/src/parTranslater.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 492b2a5349..874ad1c695 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6141,9 +6141,9 @@ static EDealRes hasVariable(SNode* pNode, void* pContext) { } static int32_t subtableExprHasVariable(SNode* pNode) { - bool hasCol = false; - nodesWalkExprPostOrder(pNode, hasVariable, &hasCol); - return hasCol; + bool hasVar = false; + nodesWalkExprPostOrder(pNode, hasVariable, &hasVar); + return hasVar; } static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { From 279fe0803f720e2d3ff09ae9a697ff01faebf4e9 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 18:35:24 +0800 Subject: [PATCH 24/33] fix:[TD-23972] push subscribe msg to vnode even though consumer not change --- source/dnode/mnode/impl/src/mndSubscribe.c | 8 ++++---- source/dnode/vnode/src/tq/tq.c | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndSubscribe.c b/source/dnode/mnode/impl/src/mndSubscribe.c index 75bc595a2e..b6ab7df68c 100644 --- a/source/dnode/mnode/impl/src/mndSubscribe.c +++ b/source/dnode/mnode/impl/src/mndSubscribe.c @@ -133,10 +133,10 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri static int32_t mndPersistSubChangeVgReq(SMnode *pMnode, STrans *pTrans, const SMqSubscribeObj *pSub, const SMqRebOutputVg *pRebVg) { - if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { - terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; - return -1; - } +// if (pRebVg->oldConsumerId == pRebVg->newConsumerId) { +// terrno = TSDB_CODE_MND_INVALID_SUB_OPTION; +// return -1; +// } void *buf; int32_t tlen; diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 7004fe0be3..1661bb4c21 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -539,10 +539,13 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } else { if (pHandle->consumerId == req.newConsumerId) { // do nothing tqInfo("vgId:%d consumer:0x%" PRIx64 " remains, no switch occurs", req.vgId, req.newConsumerId); + atomic_add_fetch_32(&pHandle->epoch, 1); + } else { tqInfo("vgId:%d switch consumer from Id:0x%" PRIx64 " to Id:0x%" PRIx64, req.vgId, pHandle->consumerId, req.newConsumerId); atomic_store_64(&pHandle->consumerId, req.newConsumerId); + atomic_store_32(&pHandle->epoch, 0); } // kill executing task qTaskInfo_t pTaskInfo = pHandle->execHandle.task; @@ -551,8 +554,6 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg } taosWLockLatch(&pTq->lock); - atomic_add_fetch_32(&pHandle->epoch, 1); - // remove if it has been register in the push manager, and return one empty block to consumer tqUnregisterPushHandle(pTq, pHandle); From 261f2736f7573203ddfd9444e249ccbaef70a660 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sat, 6 May 2023 19:17:36 +0800 Subject: [PATCH 25/33] fix:[TD-24010] subscribeStb.py failed in windows CI --- source/libs/wal/src/walRead.c | 2 +- tests/system-test/win-test-file | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 4cc43a19a0..844ad89b6c 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -75,7 +75,7 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; - endVer = TMIN(appliedVer, endVer); +// endVer = TMIN(appliedVer, endVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 ", applied index:%" PRId64 ", end index:%" PRId64, diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 214e01f1a8..7e68c40fd8 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -279,7 +279,7 @@ python3 ./test.py -f 7-tmq/subscribeDb1.py python3 ./test.py -f 7-tmq/subscribeDb2.py python3 ./test.py -f 7-tmq/subscribeDb3.py python3 ./test.py -f 7-tmq/subscribeDb4.py -#python3 ./test.py -f 7-tmq/subscribeStb.py +python3 ./test.py -f 7-tmq/subscribeStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py python3 ./test.py -f 7-tmq/subscribeStb1.py python3 ./test.py -f 7-tmq/subscribeStb2.py From a425f085040d1aab329d543c3336d939b67f897f Mon Sep 17 00:00:00 2001 From: dmchen Date: Sat, 6 May 2023 19:53:57 +0800 Subject: [PATCH 26/33] use wrong usedb map --- source/dnode/mnode/impl/src/mndUser.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndUser.c b/source/dnode/mnode/impl/src/mndUser.c index 523753d7c6..2a0d753722 100644 --- a/source/dnode/mnode/impl/src/mndUser.c +++ b/source/dnode/mnode/impl/src/mndUser.c @@ -236,7 +236,7 @@ SSdbRaw *mndUserActionEncode(SUserObj *pUser) { SDB_SET_BINARY(pRaw, dataPos, key, keyLen, _OVER); SDB_SET_INT32(pRaw, dataPos, *useDb, _OVER) - useDb = taosHashIterate(pUser->writeTbs, useDb); + useDb = taosHashIterate(pUser->useDbs, useDb); } SDB_SET_RESERVE(pRaw, dataPos, USER_RESERVE_SIZE, _OVER) From cbdddeffec83c494bb340277f694aa5abf3883d3 Mon Sep 17 00:00:00 2001 From: Adam Ji Date: Mon, 8 May 2023 10:41:22 +0800 Subject: [PATCH 27/33] docs: add wal args --- docs/examples/rust/nativeexample/examples/subscribe_demo.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs index 7551ad46b1..d54bb60e93 100644 --- a/docs/examples/rust/nativeexample/examples/subscribe_demo.rs +++ b/docs/examples/rust/nativeexample/examples/subscribe_demo.rs @@ -45,7 +45,7 @@ async fn main() -> anyhow::Result<()> { taos.exec_many([ format!("DROP TOPIC IF EXISTS tmq_meters"), format!("DROP DATABASE IF EXISTS `{db}`"), - format!("CREATE DATABASE `{db}`"), + format!("CREATE DATABASE `{db}` WAL_RETENTION_PERIOD 3600"), format!("USE `{db}`"), // create super table format!("CREATE TABLE `meters` (`ts` TIMESTAMP, `current` FLOAT, `voltage` INT, `phase` FLOAT) TAGS (`groupid` INT, `location` BINARY(24))"), From 4eed65432c9970a04a563f2b286eb0c8dd73995c Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 8 May 2023 13:26:11 +0800 Subject: [PATCH 28/33] colid overflow --- source/dnode/mnode/impl/src/mndStb.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndStb.c b/source/dnode/mnode/impl/src/mndStb.c index a73c08e69a..8b708c3e0f 100644 --- a/source/dnode/mnode/impl/src/mndStb.c +++ b/source/dnode/mnode/impl/src/mndStb.c @@ -932,7 +932,7 @@ static int32_t mndBuildStbFromAlter(SStbObj *pStb, SStbObj *pDst, SMCreateStbReq return -1; } - if(pDst->nextColId < 0 && pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){ + if(pDst->nextColId < 0 || pDst->nextColId >= 0x7fff - pDst->numOfColumns - pDst->numOfTags){ terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; return -1; } @@ -1163,8 +1163,8 @@ static int32_t mndAddSuperTableTag(const SStbObj *pOld, SStbObj *pNew, SArray *p if (mndAllocStbSchemas(pOld, pNew) != 0) { return -1; } - - if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ntags){ + + if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ntags){ terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; return -1; } @@ -1476,7 +1476,7 @@ static int32_t mndAddSuperTableColumn(const SStbObj *pOld, SStbObj *pNew, SArray return -1; } - if(pNew->nextColId < 0 && pNew->nextColId >= 0x7fff - ncols){ + if(pNew->nextColId < 0 || pNew->nextColId >= 0x7fff - ncols){ terrno = TSDB_CODE_MND_FIELD_VALUE_OVERFLOW; return -1; } From 2f45a3dfdbcb41fcab26becd1a6384e9d1a29007 Mon Sep 17 00:00:00 2001 From: slzhou Date: Mon, 8 May 2023 14:42:31 +0800 Subject: [PATCH 29/33] fix: change variable name --- source/libs/parser/src/parTranslater.c | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 874ad1c695..e753c08f9d 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -6128,7 +6128,7 @@ static bool hasJsonTypeProjection(SSelectStmt* pSelect) { return false; } -static EDealRes hasVariable(SNode* pNode, void* pContext) { +static EDealRes hasColumnOrPseudoColumn(SNode* pNode, void* pContext) { if (QUERY_NODE_COLUMN == nodeType(pNode)) { *(bool*)pContext = true; return DEAL_RES_END; @@ -6140,10 +6140,10 @@ static EDealRes hasVariable(SNode* pNode, void* pContext) { return DEAL_RES_CONTINUE; } -static int32_t subtableExprHasVariable(SNode* pNode) { - bool hasVar = false; - nodesWalkExprPostOrder(pNode, hasVariable, &hasVar); - return hasVar; +static int32_t subtableExprHasColumnOrPseudoColumn(SNode* pNode) { + bool hasColumn = false; + nodesWalkExprPostOrder(pNode, hasColumnOrPseudoColumn, &hasColumn); + return hasColumn; } static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { @@ -6157,7 +6157,7 @@ static int32_t checkStreamQuery(STranslateContext* pCxt, SCreateStreamStmt* pStm return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "SUBTABLE expression must be of VARCHAR type"); } - if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasVariable(pSelect->pSubtable)) { + if (NULL != pSelect->pSubtable && 0 == LIST_LENGTH(pSelect->pPartitionByList) && subtableExprHasColumnOrPseudoColumn(pSelect->pSubtable)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "SUBTABLE expression must not has column when no partition by clause"); } From f9142c0ddb8eba709cec30a2a4bf0fd9f954d8a3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 8 May 2023 16:49:56 +0800 Subject: [PATCH 30/33] fix:[TD24010] lost data if apply ver is small than commit ver --- include/libs/wal/wal.h | 2 +- source/client/src/clientSml.c | 4 +++- source/libs/wal/src/walRead.c | 15 ++++++++++----- 3 files changed, 14 insertions(+), 7 deletions(-) diff --git a/include/libs/wal/wal.h b/include/libs/wal/wal.h index d3e2bbb1be..46dc179295 100644 --- a/include/libs/wal/wal.h +++ b/include/libs/wal/wal.h @@ -132,7 +132,7 @@ typedef struct { } SWalRef; typedef struct { - int8_t scanUncommited; +// int8_t scanUncommited; int8_t scanNotApplied; int8_t scanMeta; int8_t enableRef; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 19d08ad66d..ce06e0eac4 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1580,7 +1580,9 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL code = smlModifyDBSchemas(info); if (code == 0 || code == TSDB_CODE_SML_INVALID_DATA || code == TSDB_CODE_PAR_TOO_MANY_COLUMNS || code == TSDB_CODE_PAR_INVALID_TAGS_NUM || code == TSDB_CODE_PAR_INVALID_TAGS_LENGTH - || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH) break; + || code == TSDB_CODE_PAR_INVALID_ROW_LENGTH || code == TSDB_CODE_MND_FIELD_VALUE_OVERFLOW) { + break; + } taosMsleep(100); uInfo("SML:0x%" PRIx64 " smlModifyDBSchemas retry code:%s, times:%d", info->id, tstrerror(code), retryNum); } while (retryNum++ < taosHashGetSize(info->superTables) * MAX_RETRY_TIMES); diff --git a/source/libs/wal/src/walRead.c b/source/libs/wal/src/walRead.c index 844ad89b6c..b29e36efdc 100644 --- a/source/libs/wal/src/walRead.c +++ b/source/libs/wal/src/walRead.c @@ -37,7 +37,7 @@ SWalReader *walOpenReader(SWal *pWal, SWalFilterCond *cond) { if (cond) { pReader->cond = *cond; } else { - pReader->cond.scanUncommited = 0; +// pReader->cond.scanUncommited = 0; pReader->cond.scanNotApplied = 0; pReader->cond.scanMeta = 0; pReader->cond.enableRef = 0; @@ -74,13 +74,18 @@ int32_t walNextValidMsg(SWalReader *pReader) { int64_t lastVer = walGetLastVer(pReader->pWal); int64_t committedVer = walGetCommittedVer(pReader->pWal); int64_t appliedVer = walGetAppliedVer(pReader->pWal); - int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; + while(appliedVer < committedVer){ // wait apply ver equal to commit ver, otherwise may lost data when consume data [TD-24010] + wDebug("vgId:%d, wal apply ver:%"PRId64" smaller than commit ver:%"PRId64", so sleep 1ms", pReader->pWal->cfg.vgId, appliedVer, committedVer); + taosMsleep(1); + appliedVer = walGetAppliedVer(pReader->pWal); + } +// int64_t endVer = pReader->cond.scanUncommited ? lastVer : committedVer; // endVer = TMIN(appliedVer, endVer); wDebug("vgId:%d, wal start to fetch, index:%" PRId64 ", last index:%" PRId64 " commit index:%" PRId64 - ", applied index:%" PRId64 ", end index:%" PRId64, - pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer, endVer); - while (fetchVer <= endVer) { + ", applied index:%" PRId64, + pReader->pWal->cfg.vgId, fetchVer, lastVer, committedVer, appliedVer); + while (fetchVer <= committedVer) { if (walFetchHeadNew(pReader, fetchVer) < 0) { return -1; } From bd5372c3782b3b6b04295f54a295151ed5b7282d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 8 May 2023 17:02:51 +0800 Subject: [PATCH 31/33] fix: protect against nullptr in syncRespCleanRsp --- source/libs/sync/src/syncRespMgr.c | 2 ++ 1 file changed, 2 insertions(+) diff --git a/source/libs/sync/src/syncRespMgr.c b/source/libs/sync/src/syncRespMgr.c index f9f14c2e00..3506d477d3 100644 --- a/source/libs/sync/src/syncRespMgr.c +++ b/source/libs/sync/src/syncRespMgr.c @@ -171,6 +171,8 @@ static void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl, bool rsp) { } void syncRespCleanRsp(SSyncRespMgr *pObj) { + if (pObj == NULL) return; + SSyncNode *pNode = pObj->data; sTrace("vgId:%d, clean all resp", pNode->vgId); From c3e0de0bfce800c13fecdd5c761ab639e23d0d02 Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Mon, 8 May 2023 09:21:20 +0000 Subject: [PATCH 32/33] fix invalid read --- source/libs/transport/src/transCli.c | 29 ++++++++++++++-------------- source/libs/transport/src/transSvr.c | 2 +- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/source/libs/transport/src/transCli.c b/source/libs/transport/src/transCli.c index c23d6d0a1f..ea35f1cfe5 100644 --- a/source/libs/transport/src/transCli.c +++ b/source/libs/transport/src/transCli.c @@ -587,12 +587,12 @@ void* destroyConnPool(SCliThrd* pThrd) { static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { void* pool = pThrd->pool; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); STrans* pTranInst = pThrd->pTransInst; if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key)); + taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key) + 1); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -627,11 +627,11 @@ static SCliConn* getConnFromPool(SCliThrd* pThrd, char* key, bool* exceed) { static SCliConn* getConnFromPool2(SCliThrd* pThrd, char* key, SCliMsg** pMsg) { void* pool = pThrd->pool; STrans* pTransInst = pThrd->pTransInst; - SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key)); + SConnList* plist = taosHashGet((SHashObj*)pool, key, strlen(key) + 1); if (plist == NULL) { SConnList list = {0}; - taosHashPut((SHashObj*)pool, key, strlen(key), (void*)&list, sizeof(list)); - plist = taosHashGet(pool, key, strlen(key)); + taosHashPut((SHashObj*)pool, key, strlen(key) + 1, (void*)&list, sizeof(list)); + plist = taosHashGet(pool, key, strlen(key) + 1); SMsgList* nList = taosMemoryCalloc(1, sizeof(SMsgList)); QUEUE_INIT(&nList->msgQ); @@ -717,7 +717,7 @@ static void addConnToPool(void* pool, SCliConn* conn) { cliDestroyConnMsgs(conn, false); if (conn->list == NULL) { - conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip)); + conn->list = taosHashGet((SHashObj*)pool, conn->ip, strlen(conn->ip) + 1); } SConnList* pList = conn->list; @@ -822,7 +822,8 @@ static void cliRecvCb(uv_stream_t* handle, ssize_t nread, const uv_buf_t* buf) { return; } if (nread < 0) { - tWarn("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), T_REF_VAL_GET(conn)); + tDebug("%s conn %p read error:%s, ref:%d", CONN_GET_INST_LABEL(conn), conn, uv_err_name(nread), + T_REF_VAL_GET(conn)); conn->broken = true; cliHandleExcept(conn); } @@ -875,8 +876,8 @@ static void cliDestroyConn(SCliConn* conn, bool clear) { connList->list->numOfConn--; connList->size--; } else { - SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip)); - connList->list->numOfConn--; + SConnList* connList = taosHashGet((SHashObj*)pThrd->pool, conn->ip, strlen(conn->ip) + 1); + if (connList != NULL) connList->list->numOfConn--; } conn->list = NULL; pThrd->newConnCount--; @@ -1269,7 +1270,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { if (pMsg != NULL && REQUEST_NO_RESP(&pMsg->msg) && (pTransInst->failFastFp != NULL && pTransInst->failFastFp(pMsg->msg.msgType))) { - SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip)); + SFailFastItem* item = taosHashGet(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1); int64_t cTimestamp = taosGetTimestampMs(); if (item != NULL) { int32_t elapse = cTimestamp - item->timestamp; @@ -1281,7 +1282,7 @@ static void cliHandleFastFail(SCliConn* pConn, int status) { } } else { SFailFastItem item = {.count = 1, .timestamp = cTimestamp}; - taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip), &item, sizeof(SFailFastItem)); + taosHashPut(pThrd->failFastCache, pConn->ip, strlen(pConn->ip) + 1, &item, sizeof(SFailFastItem)); } } } else { @@ -1459,7 +1460,7 @@ FORCE_INLINE int32_t cliBuildExceptResp(SCliMsg* pMsg, STransMsg* pResp) { } static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) { uint32_t addr = 0; - uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn)); + uint32_t* v = taosHashGet(cache, fqdn, strlen(fqdn) + 1); if (v == NULL) { addr = taosGetIpv4FromFqdn(fqdn); if (addr == 0xffffffff) { @@ -1468,7 +1469,7 @@ static FORCE_INLINE uint32_t cliGetIpFromFqdnCache(SHashObj* cache, char* fqdn) return addr; } - taosHashPut(cache, fqdn, strlen(fqdn), &addr, sizeof(addr)); + taosHashPut(cache, fqdn, strlen(fqdn) + 1, &addr, sizeof(addr)); } else { addr = *v; } diff --git a/source/libs/transport/src/transSvr.c b/source/libs/transport/src/transSvr.c index 28fb474972..269c7ecf9b 100644 --- a/source/libs/transport/src/transSvr.c +++ b/source/libs/transport/src/transSvr.c @@ -314,7 +314,7 @@ void uvOnRecvCb(uv_stream_t* cli, ssize_t nread, const uv_buf_t* buf) { return; } - tWarn("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread)); + tDebug("%s conn %p read error:%s", transLabel(pTransInst), conn, uv_err_name(nread)); if (nread < 0) { conn->broken = true; if (conn->status == ConnAcquire) { From e22c62ffbacf687fe0fab8e0104626b806183d8b Mon Sep 17 00:00:00 2001 From: wangjiaming0909 <604227650@qq.com> Date: Mon, 8 May 2023 18:13:37 +0800 Subject: [PATCH 33/33] fix: data compare of signed and unsigned integers --- source/libs/scalar/test/CMakeLists.txt | 2 +- .../libs/scalar/test/filter/filterTests.cpp | 123 ++++++++++++++++++ source/util/src/tcompare.c | 70 ++++++---- 3 files changed, 166 insertions(+), 29 deletions(-) diff --git a/source/libs/scalar/test/CMakeLists.txt b/source/libs/scalar/test/CMakeLists.txt index 32f5e098c5..caaf86264c 100644 --- a/source/libs/scalar/test/CMakeLists.txt +++ b/source/libs/scalar/test/CMakeLists.txt @@ -1,4 +1,4 @@ enable_testing() -#add_subdirectory(filter) +add_subdirectory(filter) add_subdirectory(scalar) diff --git a/source/libs/scalar/test/filter/filterTests.cpp b/source/libs/scalar/test/filter/filterTests.cpp index b59e89fe0d..51ee9b6570 100644 --- a/source/libs/scalar/test/filter/filterTests.cpp +++ b/source/libs/scalar/test/filter/filterTests.cpp @@ -33,6 +33,7 @@ #include "os.h" #include "filter.h" +#include "filterInt.h" #include "nodes.h" #include "scalar.h" #include "stub.h" @@ -344,6 +345,7 @@ TEST(timerangeTest, greater_and_lower_not_strict) { nodesDestroyNode(logicNode1); } +#if 0 TEST(columnTest, smallint_column_greater_double_value) { SNode *pLeft = NULL, *pRight = NULL, *opNode = NULL; int16_t leftv[5] = {1, 2, 3, 4, 5}; @@ -1337,6 +1339,127 @@ TEST(scalarModelogicTest, diff_columns_or_and_or) { nodesDestroyNode(logicNode1); blockDataDestroy(src); } +#endif + +template +int32_t compareSignedWithUnsigned(SignedT l, UnsignedT r) { + if (l < 0) return -1; + auto l_uint64 = static_cast(l); + auto r_uint64 = static_cast(r); + if (l_uint64 < r_uint64) return -1; + if (l_uint64 > r_uint64) return 1; + return 0; +} + +template +int32_t compareUnsignedWithSigned(UnsignedT l, SignedT r) { + if (r < 0) return 1; + auto l_uint64 = static_cast(l); + auto r_uint64 = static_cast(r); + if (l_uint64 < r_uint64) return -1; + if (l_uint64 > r_uint64) return 1; + return 0; +} + +template +void doCompareWithValueRange_SignedWithUnsigned(__compar_fn_t fp) { + int32_t signedMin = -10, signedMax = 10; + int32_t unsignedMin = 0, unsignedMax = 10; + for (SignedT l = signedMin; l <= signedMax; ++l) { + for (UnsignedT r = unsignedMin; r <= unsignedMax; ++r) { + ASSERT_EQ(fp(&l, &r), compareSignedWithUnsigned(l, r)); + } + } +} + +template +void doCompareWithValueRange_UnsignedWithSigned(__compar_fn_t fp) { + int32_t signedMin = -10, signedMax = 10; + int32_t unsignedMin = 0, unsignedMax = 10; + for (UnsignedT l = unsignedMin; l <= unsignedMax; ++l) { + for (SignedT r = signedMin; r <= signedMax; ++r) { + ASSERT_EQ(fp(&l, &r), compareUnsignedWithSigned(l, r)); + } + } +} + +template +void doCompareWithValueRange_OnlyLeftType(__compar_fn_t fp, int32_t rType) { + switch (rType) { + case TSDB_DATA_TYPE_UTINYINT: + doCompareWithValueRange_SignedWithUnsigned(fp); + break; + case TSDB_DATA_TYPE_USMALLINT: + doCompareWithValueRange_SignedWithUnsigned(fp); + break; + case TSDB_DATA_TYPE_UINT: + doCompareWithValueRange_SignedWithUnsigned(fp); + break; + case TSDB_DATA_TYPE_UBIGINT: + doCompareWithValueRange_SignedWithUnsigned(fp); + break; + case TSDB_DATA_TYPE_TINYINT: + doCompareWithValueRange_UnsignedWithSigned(fp); + break; + case TSDB_DATA_TYPE_SMALLINT: + doCompareWithValueRange_UnsignedWithSigned(fp); + break; + case TSDB_DATA_TYPE_INT: + doCompareWithValueRange_UnsignedWithSigned(fp); + break; + case TSDB_DATA_TYPE_BIGINT: + doCompareWithValueRange_UnsignedWithSigned(fp); + break; + default: + FAIL(); + } +} + +void doCompare(const std::vector &lTypes, const std::vector &rTypes, int32_t oper) { + for (int i = 0; i < lTypes.size(); ++i) { + for (int j = 0; j < rTypes.size(); ++j) { + auto fp = filterGetCompFuncEx(lTypes[i], rTypes[j], oper); + switch (lTypes[i]) { + case TSDB_DATA_TYPE_TINYINT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + case TSDB_DATA_TYPE_SMALLINT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + case TSDB_DATA_TYPE_INT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + case TSDB_DATA_TYPE_BIGINT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + case TSDB_DATA_TYPE_UTINYINT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + case TSDB_DATA_TYPE_USMALLINT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + case TSDB_DATA_TYPE_UINT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + case TSDB_DATA_TYPE_UBIGINT: + doCompareWithValueRange_OnlyLeftType(fp, rTypes[j]); + break; + default: + FAIL(); + } + } + } +} + +TEST(dataCompareTest, signed_and_unsigned_int) { + std::vector lType = {TSDB_DATA_TYPE_TINYINT, TSDB_DATA_TYPE_SMALLINT, TSDB_DATA_TYPE_INT, + TSDB_DATA_TYPE_BIGINT}; + std::vector rType = {TSDB_DATA_TYPE_UTINYINT, TSDB_DATA_TYPE_USMALLINT, TSDB_DATA_TYPE_UINT, + TSDB_DATA_TYPE_UBIGINT}; + + doCompare(lType, rType, OP_TYPE_GREATER_THAN); + doCompare(rType, lType, OP_TYPE_GREATER_THAN); +} int main(int argc, char **argv) { taosSeedRand(taosGetTimestampSec()); diff --git a/source/util/src/tcompare.c b/source/util/src/tcompare.c index f8f78ae6a5..dc57ed97b2 100644 --- a/source/util/src/tcompare.c +++ b/source/util/src/tcompare.c @@ -308,17 +308,19 @@ int32_t compareInt8Uint16(const void *pLeft, const void *pRight) { int32_t compareInt8Uint32(const void *pLeft, const void *pRight) { int8_t left = GET_INT8_VAL(pLeft); + if (left < 0) return -1; uint32_t right = GET_UINT32_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if ((uint32_t)left > right) return 1; + if ((uint32_t)left < right) return -1; return 0; } int32_t compareInt8Uint64(const void *pLeft, const void *pRight) { int8_t left = GET_INT8_VAL(pLeft); + if (left < 0) return -1; uint64_t right = GET_UINT64_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if ((uint64_t)left > right) return 1; + if ((uint64_t)left < right) return -1; return 0; } @@ -380,17 +382,19 @@ int32_t compareInt16Uint16(const void *pLeft, const void *pRight) { int32_t compareInt16Uint32(const void *pLeft, const void *pRight) { int16_t left = GET_INT16_VAL(pLeft); + if (left < 0) return -1; uint32_t right = GET_UINT32_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if ((uint32_t)left > right) return 1; + if ((uint32_t)left < right) return -1; return 0; } int32_t compareInt16Uint64(const void *pLeft, const void *pRight) { int16_t left = GET_INT16_VAL(pLeft); + if (left < 0) return -1; uint64_t right = GET_UINT64_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if ((uint64_t)left > right) return 1; + if ((uint64_t)left < right) return -1; return 0; } @@ -452,17 +456,19 @@ int32_t compareInt32Uint16(const void *pLeft, const void *pRight) { int32_t compareInt32Uint32(const void *pLeft, const void *pRight) { int32_t left = GET_INT32_VAL(pLeft); + if (left < 0) return -1; uint32_t right = GET_UINT32_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if ((uint32_t)left > right) return 1; + if ((uint32_t)left < right) return -1; return 0; } int32_t compareInt32Uint64(const void *pLeft, const void *pRight) { int32_t left = GET_INT32_VAL(pLeft); + if (left < 0) return -1; uint64_t right = GET_UINT64_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if ((uint64_t)left > right) return 1; + if ((uint64_t)left < right) return -1; return 0; } @@ -532,9 +538,10 @@ int32_t compareInt64Uint32(const void *pLeft, const void *pRight) { int32_t compareInt64Uint64(const void *pLeft, const void *pRight) { int64_t left = GET_INT64_VAL(pLeft); + if (left < 0) return -1; uint64_t right = GET_UINT64_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if ((uint64_t)left > right) return 1; + if ((uint64_t)left < right) return -1; return 0; } @@ -857,24 +864,27 @@ int32_t compareUint16Uint64(const void *pLeft, const void *pRight) { int32_t compareUint32Int8(const void *pLeft, const void *pRight) { uint32_t left = GET_UINT32_VAL(pLeft); int8_t right = GET_INT8_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if (right < 0) return 1; + if (left > (uint32_t)right) return 1; + if (left < (uint32_t)right) return -1; return 0; } int32_t compareUint32Int16(const void *pLeft, const void *pRight) { uint32_t left = GET_UINT32_VAL(pLeft); int16_t right = GET_INT16_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if (right < 0) return 1; + if (left > (uint32_t)right) return 1; + if (left < (uint32_t)right) return -1; return 0; } int32_t compareUint32Int32(const void *pLeft, const void *pRight) { uint32_t left = GET_UINT32_VAL(pLeft); int32_t right = GET_INT32_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if (right < 0) return 1; + if (left > (uint32_t)right) return 1; + if (left < (uint32_t)right) return -1; return 0; } @@ -929,32 +939,36 @@ int32_t compareUint32Uint64(const void *pLeft, const void *pRight) { int32_t compareUint64Int8(const void *pLeft, const void *pRight) { uint64_t left = GET_UINT64_VAL(pLeft); int8_t right = GET_INT8_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if (right < 0) return 1; + if (left > (uint64_t)right) return 1; + if (left < (uint64_t)right) return -1; return 0; } int32_t compareUint64Int16(const void *pLeft, const void *pRight) { uint64_t left = GET_UINT64_VAL(pLeft); int16_t right = GET_INT16_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if (right < 0) return 1; + if (left > (uint64_t)right) return 1; + if (left < (uint64_t)right) return -1; return 0; } int32_t compareUint64Int32(const void *pLeft, const void *pRight) { uint64_t left = GET_UINT64_VAL(pLeft); int32_t right = GET_INT32_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if (right < 0) return 1; + if (left > (uint64_t)right) return 1; + if (left < (uint64_t)right) return -1; return 0; } int32_t compareUint64Int64(const void *pLeft, const void *pRight) { uint64_t left = GET_UINT64_VAL(pLeft); int64_t right = GET_INT64_VAL(pRight); - if (left > right) return 1; - if (left < right) return -1; + if (right < 0) return 1; + if (left > (uint64_t)right) return 1; + if (left < (uint64_t)right) return -1; return 0; }