Hotfix/sangshuduo/td 4406 taosdemo auto create tables for develop (#6288)
* cherry-pick from master branch. * cherry-pick from master branch. c23043c0ced30f7b1f85b0ae65db405e9fc70b5d Co-authored-by: Shuduo Sang <sdsang@taosdata.com>
This commit is contained in:
parent
88f3e7aad8
commit
889ed47b56
|
@ -255,12 +255,13 @@ typedef struct SColumn_S {
|
|||
|
||||
typedef struct SSuperTable_S {
|
||||
char sTblName[MAX_TB_NAME_SIZE+1];
|
||||
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
|
||||
char childTblPrefix[MAX_TB_NAME_SIZE];
|
||||
char insertMode[MAX_TB_NAME_SIZE]; // taosc, rest
|
||||
uint16_t childTblExists;
|
||||
int64_t childTblCount;
|
||||
bool childTblExists; // 0: no, 1: yes
|
||||
uint64_t batchCreateTableNum; // 0: no batch, > 0: batch table number in one sql
|
||||
uint8_t autoCreateTable; // 0: create sub table, 1: auto create sub table
|
||||
char childTblPrefix[MAX_TB_NAME_SIZE];
|
||||
char dataSource[MAX_TB_NAME_SIZE+1]; // rand_gen or sample
|
||||
uint16_t iface; // 0: taosc, 1: rest, 2: stmt
|
||||
int64_t childTblLimit;
|
||||
uint64_t childTblOffset;
|
||||
|
@ -830,7 +831,7 @@ static void parse_args(int argc, char *argv[], SArguments *arguments) {
|
|||
if ((argc == i+1)
|
||||
|| (!isStringNumber(argv[i+1]))) {
|
||||
printHelp();
|
||||
errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, 1: ASYNC. Default is SYNC.\n");
|
||||
errorPrint("%s", "\n\t-q need a number following!\nQuery mode -- 0: SYNC, not-0: ASYNC. Default is SYNC.\n");
|
||||
exit(EXIT_FAILURE);
|
||||
}
|
||||
arguments->async_mode = atoi(argv[++i]);
|
||||
|
@ -1471,7 +1472,8 @@ static int printfInsertMeta() {
|
|||
|
||||
if (PRE_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
|
||||
printf(" autoCreateTable: \033[33m%s\033[0m\n", "no");
|
||||
} else if (AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable) {
|
||||
} else if (AUTO_CREATE_SUBTBL ==
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable) {
|
||||
printf(" autoCreateTable: \033[33m%s\033[0m\n", "yes");
|
||||
} else {
|
||||
printf(" autoCreateTable: \033[33m%s\033[0m\n", "error");
|
||||
|
@ -3063,64 +3065,61 @@ static void createChildTables() {
|
|||
char tblColsBuf[MAX_SQL_SIZE];
|
||||
int len;
|
||||
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
if (g_Dbs.use_metric) {
|
||||
if (g_Dbs.db[i].superTblCount > 0) {
|
||||
// with super table
|
||||
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|
||||
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
continue;
|
||||
}
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
if (g_Dbs.use_metric) {
|
||||
if (g_Dbs.db[i].superTblCount > 0) {
|
||||
// with super table
|
||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|
||||
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
continue;
|
||||
}
|
||||
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
|
||||
uint64_t startFrom = 0;
|
||||
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
||||
|
||||
verbosePrint("%s() LN%d: %s\n", __func__, __LINE__,
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
|
||||
uint64_t tableFrom = 0;
|
||||
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
||||
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
|
||||
__func__, __LINE__, g_totalChildTables, startFrom);
|
||||
|
||||
verbosePrint("%s() LN%d: create %"PRId64" child tables from %"PRIu64"\n",
|
||||
__func__, __LINE__, g_totalChildTables, tableFrom);
|
||||
startMultiThreadCreateChildTable(
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
|
||||
g_Dbs.threadCountByCreateTbl,
|
||||
tableFrom,
|
||||
g_Dbs.db[i].superTbls[j].childTblCount,
|
||||
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
|
||||
startMultiThreadCreateChildTable(
|
||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
|
||||
g_Dbs.threadCountByCreateTbl,
|
||||
startFrom,
|
||||
g_Dbs.db[i].superTbls[j].childTblCount,
|
||||
g_Dbs.db[i].dbName, &(g_Dbs.db[i].superTbls[j]));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// normal table
|
||||
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
|
||||
for (int j = 0; j < g_args.num_of_CPR; j++) {
|
||||
if ((strncasecmp(g_args.datatype[j], "BINARY", strlen("BINARY")) == 0)
|
||||
|| (strncasecmp(g_args.datatype[j],
|
||||
"NCHAR", strlen("NCHAR")) == 0)) {
|
||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||
", COL%d %s(%d)", j, g_args.datatype[j], g_args.len_of_binary);
|
||||
} else {
|
||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||
", COL%d %s", j, g_args.datatype[j]);
|
||||
}
|
||||
len = strlen(tblColsBuf);
|
||||
}
|
||||
|
||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
|
||||
|
||||
verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n",
|
||||
__func__, __LINE__,
|
||||
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
|
||||
startMultiThreadCreateChildTable(
|
||||
tblColsBuf,
|
||||
g_Dbs.threadCountByCreateTbl,
|
||||
0,
|
||||
g_args.num_of_tables,
|
||||
g_Dbs.db[i].dbName,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// normal table
|
||||
len = snprintf(tblColsBuf, MAX_SQL_SIZE, "(TS TIMESTAMP");
|
||||
for (int j = 0; j < g_args.num_of_CPR; j++) {
|
||||
if (g_args.datatype[j]
|
||||
&& ((strncasecmp(g_args.datatype[j],
|
||||
"BINARY", strlen("BINARY")) == 0)
|
||||
|| (strncasecmp(g_args.datatype[j],
|
||||
"NCHAR", strlen("NCHAR")) == 0))) {
|
||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||
", COL%d %s(%d)", j, g_args.datatype[j],
|
||||
g_args.len_of_binary);
|
||||
} else {
|
||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len,
|
||||
", COL%d %s", j, g_args.datatype[j]);
|
||||
}
|
||||
len = strlen(tblColsBuf);
|
||||
}
|
||||
|
||||
snprintf(tblColsBuf + len, MAX_SQL_SIZE - len, ")");
|
||||
|
||||
verbosePrint("%s() LN%d: dbName: %s num of tb: %"PRId64" schema: %s\n",
|
||||
__func__, __LINE__,
|
||||
g_Dbs.db[i].dbName, g_args.num_of_tables, tblColsBuf);
|
||||
startMultiThreadCreateChildTable(
|
||||
tblColsBuf,
|
||||
g_Dbs.threadCountByCreateTbl,
|
||||
0,
|
||||
g_args.num_of_tables,
|
||||
g_Dbs.db[i].dbName,
|
||||
NULL);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -3814,36 +3813,40 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
|
||||
// dbinfo
|
||||
cJSON *stbName = cJSON_GetObjectItem(stbInfo, "name");
|
||||
if (!stbName || stbName->type != cJSON_String || stbName->valuestring == NULL) {
|
||||
if (!stbName || stbName->type != cJSON_String
|
||||
|| stbName->valuestring == NULL) {
|
||||
errorPrint("%s() LN%d, failed to read json, stb name not found\n",
|
||||
__func__, __LINE__);
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring, MAX_TB_NAME_SIZE);
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].sTblName, stbName->valuestring,
|
||||
MAX_TB_NAME_SIZE);
|
||||
|
||||
cJSON *prefix = cJSON_GetObjectItem(stbInfo, "childtable_prefix");
|
||||
if (!prefix || prefix->type != cJSON_String || prefix->valuestring == NULL) {
|
||||
printf("ERROR: failed to read json, childtable_prefix not found\n");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring, MAX_DB_NAME_SIZE);
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].childTblPrefix, prefix->valuestring,
|
||||
MAX_DB_NAME_SIZE);
|
||||
|
||||
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table"); // yes, no, null
|
||||
cJSON *autoCreateTbl = cJSON_GetObjectItem(stbInfo, "auto_create_table");
|
||||
if (autoCreateTbl
|
||||
&& autoCreateTbl->type == cJSON_String
|
||||
&& autoCreateTbl->valuestring != NULL) {
|
||||
if (0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3)) {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
|
||||
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
} else {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
}
|
||||
if ((0 == strncasecmp(autoCreateTbl->valuestring, "yes", 3))
|
||||
&& (TBL_ALREADY_EXISTS != g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = AUTO_CREATE_SUBTBL;
|
||||
} else if (0 == strncasecmp(autoCreateTbl->valuestring, "no", 2)) {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
} else {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
}
|
||||
} else if (!autoCreateTbl) {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
} else {
|
||||
printf("ERROR: failed to read json, auto_create_table not found\n");
|
||||
goto PARSE_OVER;
|
||||
printf("ERROR: failed to read json, auto_create_table not found\n");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
cJSON* batchCreateTbl = cJSON_GetObjectItem(stbInfo, "batch_create_tbl_num");
|
||||
|
@ -3877,6 +3880,10 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
goto PARSE_OVER;
|
||||
}
|
||||
|
||||
if (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists) {
|
||||
g_Dbs.db[i].superTbls[j].autoCreateTable = PRE_CREATE_SUBTBL;
|
||||
}
|
||||
|
||||
cJSON* count = cJSON_GetObjectItem(stbInfo, "childtable_count");
|
||||
if (!count || count->type != cJSON_Number || 0 >= count->valueint) {
|
||||
errorPrint("%s() LN%d, failed to read json, childtable_count input mistake\n",
|
||||
|
@ -3934,7 +3941,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
cJSON* childTbl_offset = cJSON_GetObjectItem(stbInfo, "childtable_offset");
|
||||
if ((childTbl_offset) && (g_Dbs.db[i].drop != true)
|
||||
&& (g_Dbs.db[i].superTbls[j].childTblExists == TBL_ALREADY_EXISTS)) {
|
||||
if (childTbl_offset->type != cJSON_Number || 0 > childTbl_offset->valueint) {
|
||||
if ((childTbl_offset->type != cJSON_Number)
|
||||
|| (0 > childTbl_offset->valueint)) {
|
||||
printf("ERROR: failed to read json, childtable_offset\n");
|
||||
goto PARSE_OVER;
|
||||
}
|
||||
|
@ -3990,7 +3998,8 @@ static bool getMetaFromInsertJsonFile(cJSON* root) {
|
|||
}
|
||||
|
||||
cJSON *tagsFile = cJSON_GetObjectItem(stbInfo, "tags_file");
|
||||
if (tagsFile && tagsFile->type == cJSON_String && tagsFile->valuestring != NULL) {
|
||||
if ((tagsFile && tagsFile->type == cJSON_String)
|
||||
&& (tagsFile->valuestring != NULL)) {
|
||||
tstrncpy(g_Dbs.db[i].superTbls[j].tagsFile,
|
||||
tagsFile->valuestring, MAX_FILE_NAME_LEN);
|
||||
if (0 == g_Dbs.db[i].superTbls[j].tagsFile[0]) {
|
||||
|
@ -4946,26 +4955,29 @@ static int32_t execInsert(threadInfo *pThreadInfo, uint32_t k)
|
|||
static void getTableName(char *pTblName,
|
||||
threadInfo* pThreadInfo, uint64_t tableSeq)
|
||||
{
|
||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||
if ((superTblInfo)
|
||||
&& (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable)) {
|
||||
if (superTblInfo->childTblLimit > 0) {
|
||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
|
||||
superTblInfo->childTblName +
|
||||
(tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
|
||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||
if (superTblInfo) {
|
||||
if (AUTO_CREATE_SUBTBL != superTblInfo->autoCreateTable) {
|
||||
if (superTblInfo->childTblLimit > 0) {
|
||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
|
||||
superTblInfo->childTblName +
|
||||
(tableSeq - superTblInfo->childTblOffset) * TSDB_TABLE_NAME_LEN);
|
||||
} else {
|
||||
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->ntables, tableSeq);
|
||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
|
||||
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
|
||||
}
|
||||
} else {
|
||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
|
||||
superTblInfo->childTblPrefix, tableSeq);
|
||||
}
|
||||
} else {
|
||||
|
||||
verbosePrint("[%d] %s() LN%d: from=%"PRIu64" count=%"PRId64" seq=%"PRIu64"\n",
|
||||
pThreadInfo->threadID, __func__, __LINE__,
|
||||
pThreadInfo->start_table_from,
|
||||
pThreadInfo->ntables, tableSeq);
|
||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s",
|
||||
superTblInfo->childTblName + tableSeq * TSDB_TABLE_NAME_LEN);
|
||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
|
||||
g_args.tb_prefix, tableSeq);
|
||||
}
|
||||
} else {
|
||||
snprintf(pTblName, TSDB_TABLE_NAME_LEN, "%s%"PRIu64"",
|
||||
g_args.tb_prefix, tableSeq);
|
||||
}
|
||||
}
|
||||
|
||||
static int32_t generateDataTailWithoutStb(
|
||||
|
@ -6753,7 +6765,6 @@ static int insertTestProcess() {
|
|||
}
|
||||
}
|
||||
|
||||
// taosMsleep(1000);
|
||||
// create sub threads for inserting data
|
||||
//start = taosGetTimestampMs();
|
||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||
|
@ -7397,11 +7408,14 @@ static void *specifiedSubscribe(void *sarg) {
|
|||
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID] = taos_consume(
|
||||
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]);
|
||||
if (g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID]) {
|
||||
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0] != 0) {
|
||||
if (g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq][0]
|
||||
!= 0) {
|
||||
sprintf(pThreadInfo->filePath, "%s-%d",
|
||||
g_queryInfo.specifiedQueryInfo.result[pThreadInfo->querySeq],
|
||||
pThreadInfo->threadID);
|
||||
fetchResult(g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID], pThreadInfo);
|
||||
fetchResult(
|
||||
g_queryInfo.specifiedQueryInfo.res[pThreadInfo->threadID],
|
||||
pThreadInfo);
|
||||
}
|
||||
|
||||
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] ++;
|
||||
|
@ -7414,16 +7428,17 @@ static void *specifiedSubscribe(void *sarg) {
|
|||
g_queryInfo.specifiedQueryInfo.consumed[pThreadInfo->threadID] = 0;
|
||||
taos_unsubscribe(g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID],
|
||||
g_queryInfo.specifiedQueryInfo.subscribeKeepProgress);
|
||||
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] = subscribeImpl(
|
||||
SPECIFIED_CLASS,
|
||||
pThreadInfo,
|
||||
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
|
||||
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||
g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID] =
|
||||
subscribeImpl(
|
||||
SPECIFIED_CLASS,
|
||||
pThreadInfo,
|
||||
g_queryInfo.specifiedQueryInfo.sql[pThreadInfo->querySeq],
|
||||
g_queryInfo.specifiedQueryInfo.topic[pThreadInfo->threadID],
|
||||
g_queryInfo.specifiedQueryInfo.subscribeRestart,
|
||||
g_queryInfo.specifiedQueryInfo.subscribeInterval);
|
||||
if (NULL == g_queryInfo.specifiedQueryInfo.tsub[pThreadInfo->threadID]) {
|
||||
taos_close(pThreadInfo->taos);
|
||||
return NULL;
|
||||
taos_close(pThreadInfo->taos);
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -7636,7 +7651,7 @@ static void setParaFromArg(){
|
|||
g_Dbs.threadCountByCreateTbl = g_args.num_of_threads;
|
||||
|
||||
g_Dbs.dbCount = 1;
|
||||
g_Dbs.db[0].drop = 1;
|
||||
g_Dbs.db[0].drop = true;
|
||||
|
||||
tstrncpy(g_Dbs.db[0].dbName, g_args.database, MAX_DB_NAME_SIZE);
|
||||
g_Dbs.db[0].dbCfg.replica = g_args.replica;
|
||||
|
|
Loading…
Reference in New Issue