Merge pull request #18571 from taosdata/feature/TD-14761
opti:get meta cost for schemaless & add config for write batch in schemaless
This commit is contained in:
commit
d1a0e72459
|
@ -129,6 +129,7 @@ extern char tsUdfdLdLibPath[];
|
|||
extern char tsSmlChildTableName[];
|
||||
extern char tsSmlTagName[];
|
||||
extern bool tsSmlDataFormat;
|
||||
extern int32_t tsSmlBatchSize;
|
||||
|
||||
// wal
|
||||
extern int64_t tsWalFsyncDataSizeLimit;
|
||||
|
|
|
@ -79,7 +79,6 @@
|
|||
#define NCHAR_ADD_LEN 3 // L"nchar" 3 means L" "
|
||||
|
||||
#define MAX_RETRY_TIMES 5
|
||||
#define LINE_BATCH 2000
|
||||
//=================================================================================================
|
||||
typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType;
|
||||
|
||||
|
@ -467,6 +466,13 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
goto end;
|
||||
}
|
||||
info->cost.numOfCreateSTables++;
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
} else if (code == TSDB_CODE_SUCCESS) {
|
||||
hashTmp = taosHashInit(pTableMeta->tableInfo.numOfTags, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true,
|
||||
HASH_NO_LOCK);
|
||||
|
@ -505,7 +511,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
|
@ -516,6 +521,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
taosHashClear(hashTmp);
|
||||
for (uint16_t i = 0; i < pTableMeta->tableInfo.numOfColumns; i++) {
|
||||
|
@ -552,12 +558,18 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
uError("SML:0x%" PRIx64 " smlSendMetaMsg failed. can not create %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
code = catalogRefreshTableMeta(info->pCatalog, &conn, &pName, -1);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
goto end;
|
||||
}
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
}
|
||||
|
||||
needCheckMeta = true;
|
||||
taosHashCleanup(hashTmp);
|
||||
hashTmp = NULL;
|
||||
|
@ -565,13 +577,6 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
uError("SML:0x%" PRIx64 " load table meta error: %s", info->id, tstrerror(code));
|
||||
goto end;
|
||||
}
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
|
||||
code = catalogGetSTableMeta(info->pCatalog, &conn, &pName, &pTableMeta);
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
uError("SML:0x%" PRIx64 " catalogGetSTableMeta failed. super table name %s", info->id, pName.tname);
|
||||
goto end;
|
||||
}
|
||||
|
||||
if (needCheckMeta) {
|
||||
code = smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags,
|
||||
|
@ -596,7 +601,7 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) {
|
|||
end:
|
||||
taosHashCleanup(hashTmp);
|
||||
taosMemoryFreeClear(pTableMeta);
|
||||
catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
||||
// catalogRefreshTableMeta(info->pCatalog, &conn, &pName, 1);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -815,6 +820,11 @@ static int8_t smlGetTsTypeByPrecision(int8_t precision) {
|
|||
}
|
||||
|
||||
static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t len) {
|
||||
void *tmp = taosMemoryCalloc(1, len + 1);
|
||||
memcpy(tmp, data, len);
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxTime tslen:%d, ts:%s", info->id, len, (char*)tmp);
|
||||
taosMemoryFree(tmp);
|
||||
|
||||
if (len == 0 || (len == 1 && data[0] == '0')) {
|
||||
return taosGetTimestampNs();
|
||||
}
|
||||
|
@ -2066,7 +2076,10 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
|
|||
|
||||
static int32_t smlParseInfluxLine(SSmlHandle *info, const char *sql, const int len) {
|
||||
SSmlLineInfo elements = {0};
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine sql:%s", info->id, (info->isRawLine ? "rawdata" : sql));
|
||||
void *tmp = taosMemoryCalloc(1, len + 1);
|
||||
memcpy(tmp, sql, len);
|
||||
uDebug("SML:0x%" PRIx64 " smlParseInfluxLine raw:%d, len:%d, sql:%s", info->id, info->isRawLine, len, (info->isRawLine ? (char*)tmp : sql));
|
||||
taosMemoryFree(tmp);
|
||||
|
||||
int ret = smlParseInfluxString(sql, sql + len, &elements, &info->msgBuf);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -2562,7 +2575,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
|||
goto end;
|
||||
}
|
||||
|
||||
batchs = ceil(((double)numLines) / LINE_BATCH);
|
||||
batchs = ceil(((double)numLines) / tsSmlBatchSize);
|
||||
params.total = batchs;
|
||||
for (int i = 0; i < batchs; ++i) {
|
||||
SRequestObj *req = (SRequestObj *)createRequest(pTscObj->id, TSDB_SQL_INSERT, 0);
|
||||
|
@ -2581,7 +2594,7 @@ TAOS_RES *taos_schemaless_insert_inner(SRequestObj *request, char *lines[], char
|
|||
info->isRawLine = (rawLine == NULL);
|
||||
info->ttl = ttl;
|
||||
|
||||
int32_t perBatch = LINE_BATCH;
|
||||
int32_t perBatch = tsSmlBatchSize;
|
||||
|
||||
if (numLines > perBatch) {
|
||||
numLines -= perBatch;
|
||||
|
|
|
@ -75,6 +75,7 @@ char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; // user defined child table
|
|||
// If set to empty system will generate table name using MD5 hash.
|
||||
// true means that the name and order of cols in each line are the same(only for influx protocol)
|
||||
bool tsSmlDataFormat = false;
|
||||
int32_t tsSmlBatchSize = 10000;
|
||||
|
||||
// query
|
||||
int32_t tsQueryPolicy = 1;
|
||||
|
@ -306,6 +307,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
|||
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
|
||||
if (cfgAddString(pCfg, "smlTagName", tsSmlTagName, 1) != 0) return -1;
|
||||
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "smlBatchSize", tsSmlBatchSize, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "maxMemUsedByInsert", tsMaxMemUsedByInsert, 1, INT32_MAX, true) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "rpcRetryLimit", tsRpcRetryLimit, 1, 100000, 0) != 0) return -1;
|
||||
if (cfgAddInt32(pCfg, "rpcRetryInterval", tsRpcRetryInterval, 1, 100000, 0) != 0) return -1;
|
||||
|
@ -648,6 +650,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
|||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||
|
||||
tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
|
||||
tsMaxMemUsedByInsert = cfgGetItem(pCfg, "maxMemUsedByInsert")->i32;
|
||||
|
||||
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
||||
|
@ -1021,6 +1024,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
|
|||
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||
} else if (strcasecmp("smlDataFormat", name) == 0) {
|
||||
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||
} else if (strcasecmp("smlBatchSize", name) == 0) {
|
||||
tsSmlBatchSize = cfgGetItem(pCfg, "smlBatchSize")->i32;
|
||||
} else if (strcasecmp("shellActivityTimer", name) == 0) {
|
||||
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
||||
} else if (strcasecmp("supportVnodes", name) == 0) {
|
||||
|
|
Loading…
Reference in New Issue