refactor
This commit is contained in:
parent
d19b81c943
commit
64b462a40e
|
@ -291,7 +291,7 @@ typedef struct SSuperTable_S {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char name[TSDB_DB_NAME_LEN + 1];
|
char name[TSDB_DB_NAME_LEN + 1];
|
||||||
char create_time[32];
|
char create_time[32];
|
||||||
uint64_t ntables;
|
int64_t ntables;
|
||||||
int32_t vgroups;
|
int32_t vgroups;
|
||||||
int16_t replica;
|
int16_t replica;
|
||||||
int16_t quorum;
|
int16_t quorum;
|
||||||
|
@ -421,12 +421,13 @@ typedef struct SThreadInfo_S {
|
||||||
char tb_prefix[MAX_TB_NAME_SIZE];
|
char tb_prefix[MAX_TB_NAME_SIZE];
|
||||||
uint64_t start_table_from;
|
uint64_t start_table_from;
|
||||||
uint64_t end_table_to;
|
uint64_t end_table_to;
|
||||||
uint64_t ntables;
|
int64_t ntables;
|
||||||
uint64_t data_of_rate;
|
uint64_t data_of_rate;
|
||||||
int64_t start_time;
|
int64_t start_time;
|
||||||
char* cols;
|
char* cols;
|
||||||
bool use_metric;
|
bool use_metric;
|
||||||
SSuperTable* superTblInfo;
|
SSuperTable* superTblInfo;
|
||||||
|
char *buffer; // sql cmd buffer
|
||||||
|
|
||||||
// for async insert
|
// for async insert
|
||||||
tsem_t lock_sem;
|
tsem_t lock_sem;
|
||||||
|
@ -591,7 +592,7 @@ SArguments g_args = {
|
||||||
|
|
||||||
|
|
||||||
static SDbs g_Dbs;
|
static SDbs g_Dbs;
|
||||||
static int g_totalChildTables = 0;
|
static uint64_t g_totalChildTables = 0;
|
||||||
static SQueryMetaInfo g_queryInfo;
|
static SQueryMetaInfo g_queryInfo;
|
||||||
static FILE * g_fpOfInsertResult = NULL;
|
static FILE * g_fpOfInsertResult = NULL;
|
||||||
|
|
||||||
|
@ -1089,7 +1090,7 @@ static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type, bool quiet) {
|
||||||
TAOS_RES *res = NULL;
|
TAOS_RES *res = NULL;
|
||||||
int32_t code = -1;
|
int32_t code = -1;
|
||||||
|
|
||||||
for (i = 0; i < 5; i++) {
|
for (i = 0; i < 5 /* retry */; i++) {
|
||||||
if (NULL != res) {
|
if (NULL != res) {
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
res = NULL;
|
res = NULL;
|
||||||
|
@ -1135,7 +1136,6 @@ static void appendResultBufToFile(char *resultBuf, char *resultFile)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
fprintf(fp, "%s", resultBuf);
|
fprintf(fp, "%s", resultBuf);
|
||||||
tmfclose(fp);
|
tmfclose(fp);
|
||||||
}
|
}
|
||||||
|
@ -1583,8 +1583,8 @@ static void printfInsertMetaToFile(FILE* fp) {
|
||||||
}
|
}
|
||||||
|
|
||||||
fprintf(fp, " super table count: %"PRIu64"\n", g_Dbs.db[i].superTblCount);
|
fprintf(fp, " super table count: %"PRIu64"\n", g_Dbs.db[i].superTblCount);
|
||||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||||
fprintf(fp, " super table[%d]:\n", j);
|
fprintf(fp, " super table[%"PRIu64"]:\n", j);
|
||||||
|
|
||||||
fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName);
|
fprintf(fp, " stbName: %s\n", g_Dbs.db[i].superTbls[j].sTblName);
|
||||||
|
|
||||||
|
@ -1913,7 +1913,7 @@ static int getDbFromServer(TAOS * taos, SDbInfo** dbInfos) {
|
||||||
formatTimestamp(dbInfos[count]->create_time,
|
formatTimestamp(dbInfos[count]->create_time,
|
||||||
*(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX],
|
*(int64_t*)row[TSDB_SHOW_DB_CREATED_TIME_INDEX],
|
||||||
TSDB_TIME_PRECISION_MILLI);
|
TSDB_TIME_PRECISION_MILLI);
|
||||||
dbInfos[count]->ntables = *((int32_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]);
|
dbInfos[count]->ntables = *((int64_t *)row[TSDB_SHOW_DB_NTABLES_INDEX]);
|
||||||
dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
|
dbInfos[count]->vgroups = *((int32_t *)row[TSDB_SHOW_DB_VGROUPS_INDEX]);
|
||||||
dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]);
|
dbInfos[count]->replica = *((int16_t *)row[TSDB_SHOW_DB_REPLICA_INDEX]);
|
||||||
dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]);
|
dbInfos[count]->quorum = *((int16_t *)row[TSDB_SHOW_DB_QUORUM_INDEX]);
|
||||||
|
@ -1964,7 +1964,7 @@ static void printfDbInfoForQueryToFile(
|
||||||
fprintf(fp, "================ database[%d] ================\n", index);
|
fprintf(fp, "================ database[%d] ================\n", index);
|
||||||
fprintf(fp, "name: %s\n", dbInfos->name);
|
fprintf(fp, "name: %s\n", dbInfos->name);
|
||||||
fprintf(fp, "created_time: %s\n", dbInfos->create_time);
|
fprintf(fp, "created_time: %s\n", dbInfos->create_time);
|
||||||
fprintf(fp, "ntables: %"PRIu64"\n", dbInfos->ntables);
|
fprintf(fp, "ntables: %"PRId64"\n", dbInfos->ntables);
|
||||||
fprintf(fp, "vgroups: %d\n", dbInfos->vgroups);
|
fprintf(fp, "vgroups: %d\n", dbInfos->vgroups);
|
||||||
fprintf(fp, "replica: %d\n", dbInfos->replica);
|
fprintf(fp, "replica: %d\n", dbInfos->replica);
|
||||||
fprintf(fp, "quorum: %d\n", dbInfos->quorum);
|
fprintf(fp, "quorum: %d\n", dbInfos->quorum);
|
||||||
|
@ -2783,7 +2783,7 @@ static int createDatabasesAndStables() {
|
||||||
|
|
||||||
int validStbCount = 0;
|
int validStbCount = 0;
|
||||||
|
|
||||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||||
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
|
sprintf(command, "describe %s.%s;", g_Dbs.db[i].dbName,
|
||||||
g_Dbs.db[i].superTbls[j].sTblName);
|
g_Dbs.db[i].superTbls[j].sTblName);
|
||||||
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
verbosePrint("%s() %d command: %s\n", __func__, __LINE__, command);
|
||||||
|
@ -2795,7 +2795,7 @@ static int createDatabasesAndStables() {
|
||||||
&g_Dbs.db[i].superTbls[j]);
|
&g_Dbs.db[i].superTbls[j]);
|
||||||
|
|
||||||
if (0 != ret) {
|
if (0 != ret) {
|
||||||
errorPrint("create super table %d failed!\n\n", j);
|
errorPrint("create super table %"PRIu64" failed!\n\n", j);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2823,7 +2823,7 @@ static void* createTable(void *sarg)
|
||||||
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
threadInfo *pThreadInfo = (threadInfo *)sarg;
|
||||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||||
|
|
||||||
int64_t lastPrintTime = taosGetTimestampMs();
|
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||||
|
|
||||||
int buff_len;
|
int buff_len;
|
||||||
buff_len = BUFFER_SIZE / 8;
|
buff_len = BUFFER_SIZE / 8;
|
||||||
|
@ -2898,7 +2898,7 @@ static void* createTable(void *sarg)
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
uint64_t currentPrintTime = taosGetTimestampMs();
|
||||||
if (currentPrintTime - lastPrintTime > 30*1000) {
|
if (currentPrintTime - lastPrintTime > 30*1000) {
|
||||||
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
|
printf("thread[%d] already create %"PRIu64" - %"PRIu64" tables\n",
|
||||||
pThreadInfo->threadID, pThreadInfo->start_table_from, i);
|
pThreadInfo->threadID, pThreadInfo->start_table_from, i);
|
||||||
|
@ -2918,7 +2918,7 @@ static void* createTable(void *sarg)
|
||||||
}
|
}
|
||||||
|
|
||||||
static int startMultiThreadCreateChildTable(
|
static int startMultiThreadCreateChildTable(
|
||||||
char* cols, int threads, uint64_t startFrom, uint64_t ntables,
|
char* cols, int threads, uint64_t startFrom, int64_t ntables,
|
||||||
char* db_name, SSuperTable* superTblInfo) {
|
char* db_name, SSuperTable* superTblInfo) {
|
||||||
|
|
||||||
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
pthread_t *pids = malloc(threads * sizeof(pthread_t));
|
||||||
|
@ -2933,16 +2933,16 @@ static int startMultiThreadCreateChildTable(
|
||||||
threads = 1;
|
threads = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t a = ntables / threads;
|
int64_t a = ntables / threads;
|
||||||
if (a < 1) {
|
if (a < 1) {
|
||||||
threads = ntables;
|
threads = ntables;
|
||||||
a = 1;
|
a = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t b = 0;
|
int64_t b = 0;
|
||||||
b = ntables % threads;
|
b = ntables % threads;
|
||||||
|
|
||||||
for (int64_t i = 0; i < threads; i++) {
|
for (int i = 0; i < threads; i++) {
|
||||||
threadInfo *t_info = infos + i;
|
threadInfo *t_info = infos + i;
|
||||||
t_info->threadID = i;
|
t_info->threadID = i;
|
||||||
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
|
tstrncpy(t_info->db_name, db_name, MAX_DB_NAME_SIZE);
|
||||||
|
@ -2995,7 +2995,7 @@ static void createChildTables() {
|
||||||
if (g_Dbs.use_metric) {
|
if (g_Dbs.use_metric) {
|
||||||
if (g_Dbs.db[i].superTblCount > 0) {
|
if (g_Dbs.db[i].superTblCount > 0) {
|
||||||
// with super table
|
// with super table
|
||||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||||
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|
if ((AUTO_CREATE_SUBTBL == g_Dbs.db[i].superTbls[j].autoCreateTable)
|
||||||
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
|| (TBL_ALREADY_EXISTS == g_Dbs.db[i].superTbls[j].childTblExists)) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -3006,7 +3006,7 @@ static void createChildTables() {
|
||||||
int startFrom = 0;
|
int startFrom = 0;
|
||||||
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
g_totalChildTables += g_Dbs.db[i].superTbls[j].childTblCount;
|
||||||
|
|
||||||
verbosePrint("%s() LN%d: create %d child tables from %d\n",
|
verbosePrint("%s() LN%d: create %"PRIu64" child tables from %d\n",
|
||||||
__func__, __LINE__, g_totalChildTables, startFrom);
|
__func__, __LINE__, g_totalChildTables, startFrom);
|
||||||
startMultiThreadCreateChildTable(
|
startMultiThreadCreateChildTable(
|
||||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
|
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable,
|
||||||
|
@ -4555,7 +4555,7 @@ static void prepareSampleData() {
|
||||||
static void postFreeResource() {
|
static void postFreeResource() {
|
||||||
tmfclose(g_fpOfInsertResult);
|
tmfclose(g_fpOfInsertResult);
|
||||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||||
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
|
if (0 != g_Dbs.db[i].superTbls[j].colsOfCreateChildTable) {
|
||||||
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
|
free(g_Dbs.db[i].superTbls[j].colsOfCreateChildTable);
|
||||||
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
|
g_Dbs.db[i].superTbls[j].colsOfCreateChildTable = NULL;
|
||||||
|
@ -4760,19 +4760,21 @@ static int prepareSampleDataForSTable(SSuperTable *superTblInfo) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
|
static int64_t execInsert(threadInfo *pThreadInfo, uint64_t k)
|
||||||
{
|
{
|
||||||
int affectedRows;
|
int affectedRows;
|
||||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||||
|
|
||||||
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
|
verbosePrint("[%d] %s() LN%d %s\n", pThreadInfo->threadID,
|
||||||
__func__, __LINE__, buffer);
|
__func__, __LINE__, pThreadInfo->buffer);
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
if (superTblInfo->insertMode == TAOSC_IFACE) {
|
if (superTblInfo->insertMode == TAOSC_IFACE) {
|
||||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
affectedRows = queryDbExec(
|
||||||
|
pThreadInfo->taos,
|
||||||
|
pThreadInfo->buffer, INSERT_TYPE, false);
|
||||||
} else if (superTblInfo->insertMode == REST_IFACE) {
|
} else if (superTblInfo->insertMode == REST_IFACE) {
|
||||||
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
|
if (0 != postProceSql(g_Dbs.host, &g_Dbs.serv_addr, g_Dbs.port,
|
||||||
buffer, NULL /* not set result file */)) {
|
pThreadInfo->buffer, NULL /* not set result file */)) {
|
||||||
affectedRows = -1;
|
affectedRows = -1;
|
||||||
printf("========restful return fail, threadID[%d]\n",
|
printf("========restful return fail, threadID[%d]\n",
|
||||||
pThreadInfo->threadID);
|
pThreadInfo->threadID);
|
||||||
|
@ -4780,17 +4782,19 @@ static int64_t execInsert(threadInfo *pThreadInfo, char *buffer, uint64_t k)
|
||||||
affectedRows = k;
|
affectedRows = k;
|
||||||
}
|
}
|
||||||
} else if (superTblInfo->insertMode == STMT_IFACE) {
|
} else if (superTblInfo->insertMode == STMT_IFACE) {
|
||||||
// TODO: add stmt support
|
debugPrint("%s() LN%d, stmt=%p", __func__, __LINE__, pThreadInfo->stmt);
|
||||||
errorPrint("%s() LN%d, %s\n",
|
if (0 != taos_stmt_execute(pThreadInfo->stmt)) {
|
||||||
__func__, __LINE__, "!!! need support stmt here");
|
errorPrint("%s() LN%d, failied to execute insert statement\n",
|
||||||
exit(-1);
|
__func__, __LINE__);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
errorPrint("%s() LN%d: unknown insert mode: %d\n",
|
errorPrint("%s() LN%d: unknown insert mode: %d\n",
|
||||||
__func__, __LINE__, superTblInfo->insertMode);
|
__func__, __LINE__, superTblInfo->insertMode);
|
||||||
affectedRows = 0;
|
affectedRows = 0;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
affectedRows = queryDbExec(pThreadInfo->taos, buffer, INSERT_TYPE, false);
|
affectedRows = queryDbExec(pThreadInfo->taos, pThreadInfo->buffer, INSERT_TYPE, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
return affectedRows;
|
return affectedRows;
|
||||||
|
@ -5145,8 +5149,8 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
|
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
|
||||||
char* buffer = calloc(maxSqlLen, 1);
|
pThreadInfo->buffer = calloc(maxSqlLen, 1);
|
||||||
if (NULL == buffer) {
|
if (NULL == pThreadInfo->buffer) {
|
||||||
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
|
errorPrint( "%s() LN%d, Failed to alloc %"PRIu64" Bytes, reason:%s\n",
|
||||||
__func__, __LINE__, maxSqlLen, strerror(errno));
|
__func__, __LINE__, maxSqlLen, strerror(errno));
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -5170,7 +5174,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
uint64_t tableSeq = pThreadInfo->start_table_from;
|
uint64_t tableSeq = pThreadInfo->start_table_from;
|
||||||
|
|
||||||
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRIu64" insertRows=%"PRIu64"\n",
|
debugPrint("[%d] %s() LN%d: start_table_from=%"PRIu64" ntables=%"PRId64" insertRows=%"PRIu64"\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from,
|
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->start_table_from,
|
||||||
pThreadInfo->ntables, insertRows);
|
pThreadInfo->ntables, insertRows);
|
||||||
|
|
||||||
|
@ -5201,10 +5205,10 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
flagSleep = false;
|
flagSleep = false;
|
||||||
}
|
}
|
||||||
// generate data
|
// generate data
|
||||||
memset(buffer, 0, maxSqlLen);
|
memset(pThreadInfo->buffer, 0, maxSqlLen);
|
||||||
uint64_t remainderBufLen = maxSqlLen;
|
uint64_t remainderBufLen = maxSqlLen;
|
||||||
|
|
||||||
char *pstr = buffer;
|
char *pstr = pThreadInfo->buffer;
|
||||||
|
|
||||||
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto);
|
int len = snprintf(pstr, nInsertBufLen + 1, "%s", strInsertInto);
|
||||||
pstr += len;
|
pstr += len;
|
||||||
|
@ -5217,7 +5221,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
if (0 == strlen(tableName)) {
|
if (0 == strlen(tableName)) {
|
||||||
errorPrint("[%d] %s() LN%d, getTableName return null\n",
|
errorPrint("[%d] %s() LN%d, getTableName return null\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__);
|
pThreadInfo->threadID, __func__, __LINE__);
|
||||||
free(buffer);
|
free(pThreadInfo->buffer);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5283,7 +5287,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
pThreadInfo->threadID, __func__, __LINE__, recOfBatch,
|
||||||
pThreadInfo->totalInsertRows);
|
pThreadInfo->totalInsertRows);
|
||||||
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
verbosePrint("[%d] %s() LN%d, buffer=%s\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__, buffer);
|
pThreadInfo->threadID, __func__, __LINE__, pThreadInfo->buffer);
|
||||||
|
|
||||||
startTs = taosGetTimestampMs();
|
startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
|
@ -5294,7 +5298,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
|
errorPrint("%s\n", "\tPlease check if the batch or the buffer length is proper value!\n");
|
||||||
goto free_of_interlace;
|
goto free_of_interlace;
|
||||||
}
|
}
|
||||||
int64_t affectedRows = execInsert(pThreadInfo, buffer, recOfBatch);
|
int64_t affectedRows = execInsert(pThreadInfo, recOfBatch);
|
||||||
|
|
||||||
endTs = taosGetTimestampMs();
|
endTs = taosGetTimestampMs();
|
||||||
uint64_t delay = endTs - startTs;
|
uint64_t delay = endTs - startTs;
|
||||||
|
@ -5312,7 +5316,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
if (recOfBatch != affectedRows) {
|
if (recOfBatch != affectedRows) {
|
||||||
errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
|
errorPrint("[%d] %s() LN%d execInsert insert %"PRIu64", affected rows: %"PRId64"\n%s\n",
|
||||||
pThreadInfo->threadID, __func__, __LINE__,
|
pThreadInfo->threadID, __func__, __LINE__,
|
||||||
recOfBatch, affectedRows, buffer);
|
recOfBatch, affectedRows, pThreadInfo->buffer);
|
||||||
goto free_of_interlace;
|
goto free_of_interlace;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -5341,7 +5345,7 @@ static void* syncWriteInterlace(threadInfo *pThreadInfo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
free_of_interlace:
|
free_of_interlace:
|
||||||
tmfree(buffer);
|
tmfree(pThreadInfo->buffer);
|
||||||
printStatPerThread(pThreadInfo);
|
printStatPerThread(pThreadInfo);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -5360,8 +5364,8 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
SSuperTable* superTblInfo = pThreadInfo->superTblInfo;
|
||||||
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
|
uint64_t maxSqlLen = superTblInfo?superTblInfo->maxSqlLen:g_args.max_sql_len;
|
||||||
|
|
||||||
char* buffer = calloc(maxSqlLen, 1);
|
pThreadInfo->buffer = calloc(maxSqlLen, 1);
|
||||||
if (NULL == buffer) {
|
if (NULL == pThreadInfo->buffer) {
|
||||||
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
|
errorPrint( "Failed to alloc %"PRIu64" Bytes, reason:%s\n",
|
||||||
maxSqlLen,
|
maxSqlLen,
|
||||||
strerror(errno));
|
strerror(errno));
|
||||||
|
@ -5407,7 +5411,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
pThreadInfo->threadID, tableSeq, tableName);
|
pThreadInfo->threadID, tableSeq, tableName);
|
||||||
|
|
||||||
int64_t remainderBufLen = maxSqlLen;
|
int64_t remainderBufLen = maxSqlLen;
|
||||||
char *pstr = buffer;
|
char *pstr = pThreadInfo->buffer;
|
||||||
int nInsertBufLen = strlen("insert into ");
|
int nInsertBufLen = strlen("insert into ");
|
||||||
|
|
||||||
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into ");
|
int len = snprintf(pstr, nInsertBufLen + 1, "%s", "insert into ");
|
||||||
|
@ -5430,7 +5434,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
|
|
||||||
startTs = taosGetTimestampMs();
|
startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
int64_t affectedRows = execInsert(pThreadInfo, buffer, generated);
|
int64_t affectedRows = execInsert(pThreadInfo, generated);
|
||||||
|
|
||||||
endTs = taosGetTimestampMs();
|
endTs = taosGetTimestampMs();
|
||||||
uint64_t delay = endTs - startTs;
|
uint64_t delay = endTs - startTs;
|
||||||
|
@ -5489,7 +5493,7 @@ static void* syncWriteProgressive(threadInfo *pThreadInfo) {
|
||||||
} // tableSeq
|
} // tableSeq
|
||||||
|
|
||||||
free_of_progressive:
|
free_of_progressive:
|
||||||
tmfree(buffer);
|
tmfree(pThreadInfo->buffer);
|
||||||
printStatPerThread(pThreadInfo);
|
printStatPerThread(pThreadInfo);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -5697,7 +5701,7 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t ntables = 0;
|
int64_t ntables = 0;
|
||||||
uint64_t startFrom;
|
uint64_t startFrom;
|
||||||
|
|
||||||
if (superTblInfo) {
|
if (superTblInfo) {
|
||||||
|
@ -5768,13 +5772,13 @@ static void startMultiThreadInsertData(int threads, char* db_name,
|
||||||
|
|
||||||
taos_close(taos0);
|
taos_close(taos0);
|
||||||
|
|
||||||
uint64_t a = ntables / threads;
|
int64_t a = ntables / threads;
|
||||||
if (a < 1) {
|
if (a < 1) {
|
||||||
threads = ntables;
|
threads = ntables;
|
||||||
a = 1;
|
a = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t b = 0;
|
int64_t b = 0;
|
||||||
if (threads != 0) {
|
if (threads != 0) {
|
||||||
b = ntables % threads;
|
b = ntables % threads;
|
||||||
}
|
}
|
||||||
|
@ -5967,15 +5971,15 @@ static void *readTable(void *sarg) {
|
||||||
num_of_DPT = g_args.num_of_DPT;
|
num_of_DPT = g_args.num_of_DPT;
|
||||||
// }
|
// }
|
||||||
|
|
||||||
int num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
|
int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
|
||||||
int totalData = num_of_DPT * num_of_tables;
|
int64_t totalData = num_of_DPT * num_of_tables;
|
||||||
bool do_aggreFunc = g_Dbs.do_aggreFunc;
|
bool do_aggreFunc = g_Dbs.do_aggreFunc;
|
||||||
|
|
||||||
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
|
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
|
||||||
if (!do_aggreFunc) {
|
if (!do_aggreFunc) {
|
||||||
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
|
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
|
||||||
}
|
}
|
||||||
printf("%d records:\n", totalData);
|
printf("%"PRId64" records:\n", totalData);
|
||||||
fprintf(fp, "| QFunctions | QRecords | QSpeed(R/s) | QLatency(ms) |\n");
|
fprintf(fp, "| QFunctions | QRecords | QSpeed(R/s) | QLatency(ms) |\n");
|
||||||
|
|
||||||
for (uint64_t j = 0; j < n; j++) {
|
for (uint64_t j = 0; j < n; j++) {
|
||||||
|
@ -6007,7 +6011,7 @@ static void *readTable(void *sarg) {
|
||||||
taos_free_result(pSql);
|
taos_free_result(pSql);
|
||||||
}
|
}
|
||||||
|
|
||||||
fprintf(fp, "|%10s | %10d | %12.2f | %10.2f |\n",
|
fprintf(fp, "|%10s | %"PRIu64" | %12.2f | %10.2f |\n",
|
||||||
aggreFunc[j][0] == '*' ? " * " : aggreFunc[j], totalData,
|
aggreFunc[j][0] == '*' ? " * " : aggreFunc[j], totalData,
|
||||||
(double)(num_of_tables * num_of_DPT) / totalT, totalT * 1000);
|
(double)(num_of_tables * num_of_DPT) / totalT, totalT * 1000);
|
||||||
printf("select %10s took %.6f second(s)\n", aggreFunc[j], totalT * 1000);
|
printf("select %10s took %.6f second(s)\n", aggreFunc[j], totalT * 1000);
|
||||||
|
@ -6030,16 +6034,16 @@ static void *readMetric(void *sarg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int num_of_DPT = rinfo->superTblInfo->insertRows;
|
int num_of_DPT = rinfo->superTblInfo->insertRows;
|
||||||
int num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
|
int64_t num_of_tables = rinfo->ntables; // rinfo->end_table_to - rinfo->start_table_from + 1;
|
||||||
int totalData = num_of_DPT * num_of_tables;
|
int64_t totalData = num_of_DPT * num_of_tables;
|
||||||
bool do_aggreFunc = g_Dbs.do_aggreFunc;
|
bool do_aggreFunc = g_Dbs.do_aggreFunc;
|
||||||
|
|
||||||
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
|
int n = do_aggreFunc ? (sizeof(aggreFunc) / sizeof(aggreFunc[0])) : 2;
|
||||||
if (!do_aggreFunc) {
|
if (!do_aggreFunc) {
|
||||||
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
|
printf("\nThe first field is either Binary or Bool. Aggregation functions are not supported.\n");
|
||||||
}
|
}
|
||||||
printf("%d records:\n", totalData);
|
printf("%"PRId64" records:\n", totalData);
|
||||||
fprintf(fp, "Querying On %d records:\n", totalData);
|
fprintf(fp, "Querying On %"PRId64" records:\n", totalData);
|
||||||
|
|
||||||
for (int j = 0; j < n; j++) {
|
for (int j = 0; j < n; j++) {
|
||||||
char condition[COND_BUF_LEN] = "\0";
|
char condition[COND_BUF_LEN] = "\0";
|
||||||
|
@ -6137,11 +6141,11 @@ static int insertTestProcess() {
|
||||||
end = taosGetTimestampMs();
|
end = taosGetTimestampMs();
|
||||||
|
|
||||||
if (g_totalChildTables > 0) {
|
if (g_totalChildTables > 0) {
|
||||||
fprintf(stderr, "Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
|
fprintf(stderr, "Spent %.4f seconds to create %"PRIu64" tables with %d thread(s)\n\n",
|
||||||
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
|
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
|
||||||
if (g_fpOfInsertResult) {
|
if (g_fpOfInsertResult) {
|
||||||
fprintf(g_fpOfInsertResult,
|
fprintf(g_fpOfInsertResult,
|
||||||
"Spent %.4f seconds to create %d tables with %d thread(s)\n\n",
|
"Spent %.4f seconds to create %"PRIu64" tables with %d thread(s)\n\n",
|
||||||
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
|
(end - start)/1000.0, g_totalChildTables, g_Dbs.threadCountByCreateTbl);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -6152,7 +6156,7 @@ static int insertTestProcess() {
|
||||||
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
for (int i = 0; i < g_Dbs.dbCount; i++) {
|
||||||
if (g_Dbs.use_metric) {
|
if (g_Dbs.use_metric) {
|
||||||
if (g_Dbs.db[i].superTblCount > 0) {
|
if (g_Dbs.db[i].superTblCount > 0) {
|
||||||
for (int j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
for (uint64_t j = 0; j < g_Dbs.db[i].superTblCount; j++) {
|
||||||
|
|
||||||
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
|
SSuperTable* superTblInfo = &g_Dbs.db[i].superTbls[j];
|
||||||
|
|
||||||
|
@ -6467,16 +6471,16 @@ static int queryTestProcess() {
|
||||||
ERROR_EXIT("memory allocation failed for create threads\n");
|
ERROR_EXIT("memory allocation failed for create threads\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
|
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
|
||||||
int threads = g_queryInfo.superQueryInfo.threadCnt;
|
int threads = g_queryInfo.superQueryInfo.threadCnt;
|
||||||
|
|
||||||
uint64_t a = ntables / threads;
|
int64_t a = ntables / threads;
|
||||||
if (a < 1) {
|
if (a < 1) {
|
||||||
threads = ntables;
|
threads = ntables;
|
||||||
a = 1;
|
a = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t b = 0;
|
int64_t b = 0;
|
||||||
if (threads != 0) {
|
if (threads != 0) {
|
||||||
b = ntables % threads;
|
b = ntables % threads;
|
||||||
}
|
}
|
||||||
|
@ -6815,16 +6819,16 @@ static int subscribeTestProcess() {
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
|
int64_t ntables = g_queryInfo.superQueryInfo.childTblCount;
|
||||||
int threads = g_queryInfo.superQueryInfo.threadCnt;
|
int threads = g_queryInfo.superQueryInfo.threadCnt;
|
||||||
|
|
||||||
uint64_t a = ntables / threads;
|
int64_t a = ntables / threads;
|
||||||
if (a < 1) {
|
if (a < 1) {
|
||||||
threads = ntables;
|
threads = ntables;
|
||||||
a = 1;
|
a = 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t b = 0;
|
int64_t b = 0;
|
||||||
if (threads != 0) {
|
if (threads != 0) {
|
||||||
b = ntables % threads;
|
b = ntables % threads;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue