commit
aefc6052c2
|
@ -173,6 +173,7 @@ typedef struct SSuperTable_S {
|
|||
int childTblCount;
|
||||
bool superTblExists; // 0: no, 1: yes
|
||||
bool childTblExists; // 0: no, 1: yes
|
||||
int batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
|
||||
int8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
||||
char childTblPrefix[MAX_TB_NAME_SIZE];
|
||||
char dataSource[MAX_TB_NAME_SIZE]; // rand_gen or sample
|
||||
|
@ -808,13 +809,14 @@ static void init_rand_data() {
|
|||
|
||||
static void printfInsertMeta() {
|
||||
printf("\033[1m\033[40;32m================ insert.json parse result START ================\033[0m\n");
|
||||
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
|
||||
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
|
||||
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
|
||||
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
|
||||
printf("thread count: \033[33m%d\033[0m\n", g_Dbs.threadCount);
|
||||
printf("host: \033[33m%s:%u\033[0m\n", g_Dbs.host, g_Dbs.port);
|
||||
printf("user: \033[33m%s\033[0m\n", g_Dbs.user);
|
||||
printf("password: \033[33m%s\033[0m\n", g_Dbs.password);
|
||||
printf("resultFile: \033[33m%s\033[0m\n", g_Dbs.resultFile);
|
||||
printf("thread num of insert data: \033[33m%d\033[0m\n", g_Dbs.threadCount);
|
||||
printf("thread num of create table: \033[33m%d\033[0m\n", g_Dbs.threadCountByCreateTbl);
|
||||
|
||||
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
|
||||
printf("database count: \033[33m%d\033[0m\n", g_Dbs.dbCount);
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
printf("database[\033[33m%d\033[0m]:\n", i);
|
||||
printf(" database name: \033[33m%s\033[0m\n", g_Dbs.db[i].dbName);
|
||||
|
@ -944,11 +946,12 @@ static void printfInsertMeta() {
|
|||
|
||||
static void printfInsertMetaToFile(FILE* fp) {
|
||||
fprintf(fp, "================ insert.json parse result START================\n");
|
||||
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
|
||||
fprintf(fp, "user: %s\n", g_Dbs.user);
|
||||
fprintf(fp, "password: %s\n", g_Dbs.password);
|
||||
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
||||
fprintf(fp, "thread count: %d\n", g_Dbs.threadCount);
|
||||
fprintf(fp, "host: %s:%u\n", g_Dbs.host, g_Dbs.port);
|
||||
fprintf(fp, "user: %s\n", g_Dbs.user);
|
||||
fprintf(fp, "password: %s\n", g_Dbs.password);
|
||||
fprintf(fp, "resultFile: %s\n", g_Dbs.resultFile);
|
||||
fprintf(fp, "thread num of insert data: %d\n", g_Dbs.threadCount);
|
||||
fprintf(fp, "thread num of create table: %d\n", g_Dbs.threadCountByCreateTbl);
|
||||
|
||||
fprintf(fp, "database count: %d\n", g_Dbs.dbCount);
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
|
@ -1730,19 +1733,27 @@ static int createDatabases() {
|
|||
|
||||
|
||||
void * createTable(void *sarg)
|
||||
{
|
||||
char command[BUFFER_SIZE] = "\0";
|
||||
|
||||
{
|
||||
threadInfo *winfo = (threadInfo *)sarg;
|
||||
SSuperTable* superTblInfo = winfo->superTblInfo;
|
||||
|
||||
int64_t lastPrintTime = taosGetTimestampMs();
|
||||
|
||||
char* buffer = calloc(superTblInfo->maxSqlLen, 1);
|
||||
|
||||
int len = 0;
|
||||
int batchNum = 0;
|
||||
//printf("Creating table from %d to %d\n", winfo->start_table_id, winfo->end_table_id);
|
||||
for (int i = winfo->start_table_id; i <= winfo->end_table_id; i++) {
|
||||
if (0 == g_Dbs.use_metric) {
|
||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
|
||||
snprintf(buffer, BUFFER_SIZE, "create table if not exists %s.%s%d %s;", winfo->db_name, superTblInfo->childTblPrefix, i, superTblInfo->colsOfCreatChildTable);
|
||||
} else {
|
||||
if (0 == len) {
|
||||
batchNum = 0;
|
||||
memset(buffer, 0, superTblInfo->maxSqlLen);
|
||||
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "create table ");
|
||||
}
|
||||
|
||||
char* tagsValBuf = NULL;
|
||||
if (0 == superTblInfo->tagSource) {
|
||||
tagsValBuf = generateTagVaulesForStb(superTblInfo);
|
||||
|
@ -1750,13 +1761,22 @@ void * createTable(void *sarg)
|
|||
tagsValBuf = getTagValueFromTagSample(superTblInfo, i % superTblInfo->tagSampleCount);
|
||||
}
|
||||
if (NULL == tagsValBuf) {
|
||||
free(buffer);
|
||||
return NULL;
|
||||
}
|
||||
snprintf(command, BUFFER_SIZE, "create table if not exists %s.%s%d using %s.%s tags %s;", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
|
||||
|
||||
len += snprintf(buffer + len, superTblInfo->maxSqlLen - len, "if not exists %s.%s%d using %s.%s tags %s ", winfo->db_name, superTblInfo->childTblPrefix, i, winfo->db_name, superTblInfo->sTblName, tagsValBuf);
|
||||
free(tagsValBuf);
|
||||
batchNum++;
|
||||
|
||||
if ((batchNum < superTblInfo->batchCreateTableNum) && ((superTblInfo->maxSqlLen - len) >= (superTblInfo->lenOfTagOfOneRow + 256))) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (0 != queryDbExec(winfo->taos, command, NO_INSERT_TYPE)){
|
||||
|
||||
len = 0;
|
||||
if (0 != queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE)){
|
||||
free(buffer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -1766,7 +1786,12 @@ void * createTable(void *sarg)
|
|||
lastPrintTime = currentPrintTime;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
if (0 != len) {
|
||||
(void)queryDbExec(winfo->taos, buffer, NO_INSERT_TYPE);
|
||||
}
|
||||
|
||||
free(buffer);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -2422,6 +2447,16 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
printf("failed to read json, auto_create_table not found");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
|
||||
if (batchCreateTbl && batchCreateTbl->type == cJSON_Number) {
|
||||
g_Dbs.db[i].superTbls[j].batchCreateTableNum = batchCreateTbl->valueint;
|
||||
} else if (!batchCreateTbl) {
|
||||
g_Dbs.db[i].superTbls[j].batchCreateTableNum = 2000;
|
||||
} else {
|
||||
printf("failed to read json, batch_create_tbl_num not found");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON *childTblExists = cJSON_GetObjectItem(stbInfo, "child_table_exists"); // yes, no
|
||||
if (childTblExists && childTblExists->type == cJSON_String && childTblExists->valuestring != NULL) {
|
||||
|
@ -3679,14 +3714,14 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
|||
b = ntables % threads;
|
||||
}
|
||||
|
||||
TAOS* taos;
|
||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
||||
taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
||||
if (NULL == taos) {
|
||||
printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
//TAOS* taos;
|
||||
//if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
||||
// taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
||||
// if (NULL == taos) {
|
||||
// printf("connect to server fail, reason: %s\n", taos_errstr(NULL));
|
||||
// exit(-1);
|
||||
// }
|
||||
//}
|
||||
|
||||
int32_t timePrec = TSDB_TIME_PRECISION_MILLI;
|
||||
if (0 != precision[0]) {
|
||||
|
@ -3719,7 +3754,12 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
|||
t_info->start_time = start_time;
|
||||
|
||||
if (0 == strncasecmp(superTblInfo->insertMode, "taosc", 5)) {
|
||||
t_info->taos = taos;
|
||||
//t_info->taos = taos;
|
||||
t_info->taos = taos_connect(g_Dbs.host, g_Dbs.user, g_Dbs.password, db_name, g_Dbs.port);
|
||||
if (NULL == t_info->taos) {
|
||||
printf("connect to server fail from insert sub thread, reason: %s\n", taos_errstr(NULL));
|
||||
exit(-1);
|
||||
}
|
||||
} else {
|
||||
t_info->taos = NULL;
|
||||
#ifdef TD_LOWA_CURL
|
||||
|
@ -3754,6 +3794,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
|||
threadInfo *t_info = infos + i;
|
||||
|
||||
tsem_destroy(&(t_info->lock_sem));
|
||||
taos_close(t_info->taos);
|
||||
|
||||
superTblInfo->totalAffectedRows += t_info->totalAffectedRows;
|
||||
superTblInfo->totalRowsInserted += t_info->totalRowsInserted;
|
||||
|
@ -3766,7 +3807,7 @@ void startMultiThreadInsertData(int threads, char* db_name, char* precision, SSu
|
|||
|
||||
double end = getCurrentTime();
|
||||
|
||||
taos_close(taos);
|
||||
//taos_close(taos);
|
||||
|
||||
free(pids);
|
||||
free(infos);
|
||||
|
|
Loading…
Reference in New Issue