From e07823c55a0644267bb2e5f79b4f35919ee5cf71 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Nov 2022 13:59:33 +0800 Subject: [PATCH 1/6] opti: get meta logic for schemaless --- source/client/src/clientSml.c | 43 ++++++++++++++++++----------------- 1 file changed, 22 insertions(+), 21 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index f4d8c80e3f..985b758869 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -467,6 +467,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { goto end; } info->cost.numOfCreateSTables++; + taosMemoryFreeClear(pTableMeta); + + code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname); + goto end; + } } else if (code == TSDB_CODE_SUCCESS) { hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); @@ -505,16 +512,16 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname); goto end; } - } - taosMemoryFreeClear(pTableMeta); - code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } - code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); - if (code != TSDB_CODE_SUCCESS) { - goto end; + taosMemoryFreeClear(pTableMeta); + code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } + code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } } taosHashClear(hashTmp); @@ -552,12 +559,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname); goto end; } + + code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); + if (code != TSDB_CODE_SUCCESS) { + goto end; + } } - code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1); - if (code != TSDB_CODE_SUCCESS) { - goto end; - } needCheckMeta = true; taosHashCleanup(hashTmp); hashTmp = NULL; @@ -565,13 +573,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code)); goto end; } - taosMemoryFreeClear(pTableMeta); - - code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname); - goto end; - } if (needCheckMeta) { code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, @@ -596,7 +597,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { end: taosHashCleanup(hashTmp); taosMemoryFreeClear(pTableMeta); - catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); +// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1); return code; } From 162575ea66a4fedadbbf225dc692c5fbff4bc5b5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Nov 2022 14:30:44 +0800 Subject: [PATCH 2/6] opti:get meta cost for schemaless & add config for write batch in schemaless --- include/common/tglobal.h | 1 + source/client/src/clientSml.c | 5 ++--- source/common/src/tglobal.c | 5 +++++ 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/include/common/tglobal.h b/include/common/tglobal.h index 0ded2bc4b5..24ed898163 100644 --- a/include/common/tglobal.h +++ b/include/common/tglobal.h @@ -125,6 +125,7 @@ extern char tsUdfdLdLibPath[]; extern char tsSmlChildTableName[]; extern char tsSmlTagName[]; extern bool tsSmlDataFormat; +extern int32_t tsSmlBatchSize; // wal extern int64_t tsWalFsyncDataSizeLimit; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 985b758869..28e873cca3 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -79,7 +79,6 @@ #define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" " #define MAX_RETRY_TIMES 5 -#define LINE_BATCH 2000 //================================================================================================= typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; @@ -2563,7 +2562,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char goto end; } - batchs = ceil(((double)numLines) / LINE_BATCH); + batchs = ceil(((double)numLines) / tsSmlBatchSize); params.total = batchs; for (int i = 0; i < batchs; ++i) { SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0); @@ -2582,7 +2581,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char info->isRawLine = (rawLine == NULL); info->ttl = ttl; - int32_t perBatch = LINE_BATCH; + int32_t perBatch = tsSmlBatchSize; if (numLines > perBatch) { numLines -= perBatch; diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index aeef1b5277..d922b5342b 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -75,6 +75,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table // If set to empty system will generate table name using MD5 hash. // true means that the name and order of cols in each line are the same(only for influx protocol) bool tsSmlDataFormat = false; +int32_t tsSmlBatchSize = 10000; // query int32_t tsQueryPolicy = 1; @@ -302,6 +303,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) { if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1; if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1; if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1; + if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1; @@ -643,6 +645,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; + tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; @@ -1013,6 +1016,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) { tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN); } else if (strcasecmp("smlDataFormat", name) == 0) { tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval; + } else if (strcasecmp("smlBatchSize", name) == 0) { + tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32; } else if (strcasecmp("shellActivityTimer", name) == 0) { tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; } else if (strcasecmp("supportVnodes", name) == 0) { From ad61b5b593e4ca745b5329fcecc2607a7bedc9d5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Nov 2022 17:07:30 +0800 Subject: [PATCH 3/6] fix:add log --- source/client/src/clientSml.c | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 28e873cca3..5381b2c0dd 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -815,6 +815,11 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) { } static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) { + char *tmp = taosMemoryCalloc(1, len + 1); + memcpy(tmp, data, len); + uDebug("SML:0x%" PRIx64 " smlParseInfluxTime ts:%s", info->id, tmp); + taosMemoryFree(tmp); + if (len == 0 || (len == 1 && data[0] == '0')) { return taosGetTimestampNs(); } @@ -2066,7 +2071,10 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; - uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s", info->id, (info->isRawLine ? "rawdata" : sql)); + char *tmp = taosMemoryCalloc(1, len + 1); + memcpy(tmp, sql, len); + uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, sql:%s", info->id, info->isRawLine, (info->isRawLine ? tmp : sql)); + taosMemoryFree(tmp); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf); if (ret != TSDB_CODE_SUCCESS) { From 3f82427e7d0bed75f984ad0a9e60f835d6b83929 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Nov 2022 17:11:17 +0800 Subject: [PATCH 4/6] fix:add log --- source/client/src/clientSml.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 5381b2c0dd..17cbf377f5 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -815,7 +815,7 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) { } static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) { - char *tmp = taosMemoryCalloc(1, len + 1); + char *tmp = (char*)taosMemoryCalloc(1, len + 1); memcpy(tmp, data, len); uDebug("SML:0x%" PRIx64 " smlParseInfluxTime ts:%s", info->id, tmp); taosMemoryFree(tmp); @@ -2071,7 +2071,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; - char *tmp = taosMemoryCalloc(1, len + 1); + char *tmp = (char*)taosMemoryCalloc(1, len + 1); memcpy(tmp, sql, len); uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, sql:%s", info->id, info->isRawLine, (info->isRawLine ? tmp : sql)); taosMemoryFree(tmp); From 2bd4c09568dbc7e5778638c144d3decf85585029 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Nov 2022 18:02:30 +0800 Subject: [PATCH 5/6] fix:get meta if cols change in schemaless --- source/client/src/clientSml.c | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 17cbf377f5..9a08f88cfa 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -563,6 +563,11 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { if (code != TSDB_CODE_SUCCESS) { goto end; } + code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname); + goto end; + } } needCheckMeta = true; From 9eae79a1f91a0779d84774183b28d601b5df7c4e Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 30 Nov 2022 19:36:02 +0800 Subject: [PATCH 6/6] fix: complie error in windows --- source/client/src/clientSml.c | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 9a08f88cfa..d811eb7fec 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -820,9 +820,9 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) { } static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) { - char *tmp = (char*)taosMemoryCalloc(1, len + 1); + void *tmp = taosMemoryCalloc(1, len + 1); memcpy(tmp, data, len); - uDebug("SML:0x%" PRIx64 " smlParseInfluxTime ts:%s", info->id, tmp); + uDebug("SML:0x%" PRIx64 " smlParseInfluxTime tslen:%d, ts:%s", info->id, len, (char*)tmp); taosMemoryFree(tmp); if (len == 0 || (len == 1 && data[0] == '0')) { @@ -2076,9 +2076,9 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo * static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) { SSmlLineInfo elements = {0}; - char *tmp = (char*)taosMemoryCalloc(1, len + 1); + void *tmp = taosMemoryCalloc(1, len + 1); memcpy(tmp, sql, len); - uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, sql:%s", info->id, info->isRawLine, (info->isRawLine ? tmp : sql)); + uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? (char*)tmp : sql)); taosMemoryFree(tmp); int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);