Test/sangshuduo/td 13408 move tests in for3.0 (#10603)
* restore .gitmodules * Revert "[TD-13408]<test>: move tests out" This reverts commit f80a4ca49ff37431bc0bc0dafb5ccf5858b00beb. * revert f80a4ca49ff37431bc0bc0dafb5ccf5858b00beb * immigrate file change from stand-alone repo to TDengine for 3.0 * remove tests repository for Jenkinsfile2 * remove tests/test from .gitignore Co-authored-by: tangfangzhi <fztang@taosdata.com>
This commit is contained in:
parent
43feda0595
commit
611a2c629b
|
@ -24,7 +24,6 @@ mac/
|
|||
*.orig
|
||||
src/connector/nodejs/node_modules/
|
||||
src/connector/nodejs/out/
|
||||
tests/test/
|
||||
tests/taoshebei/
|
||||
tests/taoscsv/
|
||||
tests/taosdalipu/
|
||||
|
|
|
@ -0,0 +1,706 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
// clang-format off
|
||||
|
||||
#include <assert.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <unistd.h>
|
||||
#include <stdlib.h>
|
||||
#include <dirent.h>
|
||||
|
||||
#include "taos.h"
|
||||
#include "taoserror.h"
|
||||
#include "tlog.h"
|
||||
|
||||
#define GREEN "\033[1;32m"
|
||||
#define NC "\033[0m"
|
||||
#define min(a, b) (((a) < (b)) ? (a) : (b))
|
||||
|
||||
#define MAX_SQL_STR_LEN (1024 * 1024)
|
||||
#define MAX_ROW_STR_LEN (16 * 1024)
|
||||
|
||||
enum _RUN_MODE {
|
||||
TMQ_RUN_INSERT_AND_CONSUME,
|
||||
TMQ_RUN_ONLY_INSERT,
|
||||
TMQ_RUN_ONLY_CONSUME,
|
||||
TMQ_RUN_MODE_BUTT
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
char dbName[32];
|
||||
char stbName[64];
|
||||
char resultFileName[256];
|
||||
char vnodeWalPath[256];
|
||||
int32_t numOfThreads;
|
||||
int32_t numOfTables;
|
||||
int32_t numOfVgroups;
|
||||
int32_t runMode;
|
||||
int32_t numOfColumn;
|
||||
double ratio;
|
||||
int32_t batchNumOfRow;
|
||||
int32_t totalRowsOfPerTbl;
|
||||
int64_t startTimestamp;
|
||||
int32_t showMsgFlag;
|
||||
|
||||
int32_t totalRowsOfT2;
|
||||
} SConfInfo;
|
||||
|
||||
static SConfInfo g_stConfInfo = {
|
||||
"tmqdb",
|
||||
"stb",
|
||||
"./tmqResult.txt", // output_file
|
||||
"/data2/dnode/data/vnode/vnode2/wal",
|
||||
1, // threads
|
||||
1, // tables
|
||||
1, // vgroups
|
||||
0, // run mode
|
||||
1, // columns
|
||||
1, // ratio
|
||||
1, // batch size
|
||||
10000, // total rows for per table
|
||||
0, // 2020-01-01 00:00:00.000
|
||||
0, // show consume msg switch
|
||||
10000,
|
||||
};
|
||||
|
||||
char* g_pRowValue = NULL;
|
||||
TdFilePtr g_fp = NULL;
|
||||
|
||||
static void printHelp() {
|
||||
char indent[10] = " ";
|
||||
printf("Used to test the performance while create table\n");
|
||||
|
||||
printf("%s%s\n", indent, "-c");
|
||||
printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir);
|
||||
printf("%s%s\n", indent, "-d");
|
||||
printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", g_stConfInfo.dbName);
|
||||
printf("%s%s\n", indent, "-s");
|
||||
printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", g_stConfInfo.stbName);
|
||||
printf("%s%s\n", indent, "-f");
|
||||
printf("%s%s%s%s\n", indent, indent, "The file of result, default is ", g_stConfInfo.resultFileName);
|
||||
printf("%s%s\n", indent, "-w");
|
||||
printf("%s%s%s%s\n", indent, indent, "The path of vnode of wal, default is ", g_stConfInfo.vnodeWalPath);
|
||||
printf("%s%s\n", indent, "-t");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", g_stConfInfo.numOfThreads);
|
||||
printf("%s%s\n", indent, "-n");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", g_stConfInfo.numOfTables);
|
||||
printf("%s%s\n", indent, "-v");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", g_stConfInfo.numOfVgroups);
|
||||
printf("%s%s\n", indent, "-a");
|
||||
printf("%s%s%s%d\n", indent, indent, "runMode, default is ", g_stConfInfo.runMode);
|
||||
printf("%s%s\n", indent, "-l");
|
||||
printf("%s%s%s%d\n", indent, indent, "numOfColumn, default is ", g_stConfInfo.numOfColumn);
|
||||
printf("%s%s\n", indent, "-q");
|
||||
printf("%s%s%s%f\n", indent, indent, "ratio, default is ", g_stConfInfo.ratio);
|
||||
printf("%s%s\n", indent, "-b");
|
||||
printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", g_stConfInfo.batchNumOfRow);
|
||||
printf("%s%s\n", indent, "-r");
|
||||
printf("%s%s%s%d\n", indent, indent, "totalRowsOfPerTbl, default is ", g_stConfInfo.totalRowsOfPerTbl);
|
||||
printf("%s%s\n", indent, "-m");
|
||||
printf("%s%s%s%" PRId64 "\n", indent, indent, "startTimestamp, default is ", g_stConfInfo.startTimestamp);
|
||||
printf("%s%s\n", indent, "-g");
|
||||
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
|
||||
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
void parseArgument(int32_t argc, char *argv[]) {
|
||||
|
||||
g_stConfInfo.startTimestamp = 1640966400000; // 2020-01-01 00:00:00.000
|
||||
|
||||
|
||||
for (int32_t i = 1; i < argc; i++) {
|
||||
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
||||
printHelp();
|
||||
exit(0);
|
||||
} else if (strcmp(argv[i], "-d") == 0) {
|
||||
strcpy(g_stConfInfo.dbName, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-c") == 0) {
|
||||
strcpy(configDir, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s") == 0) {
|
||||
strcpy(g_stConfInfo.stbName, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-w") == 0) {
|
||||
strcpy(g_stConfInfo.vnodeWalPath, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-f") == 0) {
|
||||
strcpy(g_stConfInfo.resultFileName, argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t") == 0) {
|
||||
g_stConfInfo.numOfThreads = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0) {
|
||||
g_stConfInfo.numOfTables = atoll(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-v") == 0) {
|
||||
g_stConfInfo.numOfVgroups = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-a") == 0) {
|
||||
g_stConfInfo.runMode = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-b") == 0) {
|
||||
g_stConfInfo.batchNumOfRow = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-r") == 0) {
|
||||
g_stConfInfo.totalRowsOfPerTbl = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-l") == 0) {
|
||||
g_stConfInfo.numOfColumn = atoi(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-q") == 0) {
|
||||
g_stConfInfo.ratio = atof(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-m") == 0) {
|
||||
g_stConfInfo.startTimestamp = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-g") == 0) {
|
||||
g_stConfInfo.showMsgFlag = atol(argv[++i]);
|
||||
} else {
|
||||
pPrint("%s unknow para: %s %s", GREEN, argv[++i], NC);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
g_stConfInfo.totalRowsOfT2 = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.ratio;
|
||||
|
||||
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
|
||||
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
|
||||
pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC);
|
||||
pPrint("%s resultFileName:%s %s", GREEN, g_stConfInfo.resultFileName, NC);
|
||||
pPrint("%s vnodeWalPath:%s %s", GREEN, g_stConfInfo.vnodeWalPath, NC);
|
||||
pPrint("%s numOfTables:%d %s", GREEN, g_stConfInfo.numOfTables, NC);
|
||||
pPrint("%s numOfThreads:%d %s", GREEN, g_stConfInfo.numOfThreads, NC);
|
||||
pPrint("%s numOfVgroups:%d %s", GREEN, g_stConfInfo.numOfVgroups, NC);
|
||||
pPrint("%s runMode:%d %s", GREEN, g_stConfInfo.runMode, NC);
|
||||
pPrint("%s ratio:%f %s", GREEN, g_stConfInfo.ratio, NC);
|
||||
pPrint("%s numOfColumn:%d %s", GREEN, g_stConfInfo.numOfColumn, NC);
|
||||
pPrint("%s batchNumOfRow:%d %s", GREEN, g_stConfInfo.batchNumOfRow, NC);
|
||||
pPrint("%s totalRowsOfPerTbl:%d %s", GREEN, g_stConfInfo.totalRowsOfPerTbl, NC);
|
||||
pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
|
||||
pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
|
||||
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
|
||||
}
|
||||
|
||||
static int running = 1;
|
||||
static void msg_process(tmq_message_t* message) { tmqShowMsg(message); }
|
||||
|
||||
// calc dir size (not include itself 4096Byte)
|
||||
int64_t getDirectorySize(char *dir)
|
||||
{
|
||||
DIR *dp;
|
||||
struct dirent *entry;
|
||||
int64_t totalSize=0;
|
||||
|
||||
if ((dp = opendir(dir)) == NULL) {
|
||||
fprintf(stderr, "Cannot open dir: %s\n", dir);
|
||||
return -1;
|
||||
}
|
||||
|
||||
//lstat(dir, &statbuf);
|
||||
//totalSize+=statbuf.st_size;
|
||||
|
||||
while ((entry = readdir(dp)) != NULL) {
|
||||
char subdir[1024];
|
||||
sprintf(subdir, "%s/%s", dir, entry->d_name);
|
||||
|
||||
//printf("===d_name: %s\n", entry->d_name);
|
||||
if (taosIsDir(subdir)) {
|
||||
if (strcmp(".", entry->d_name) == 0 || strcmp("..", entry->d_name) == 0) {
|
||||
continue;
|
||||
}
|
||||
|
||||
int64_t subDirSize = getDirectorySize(subdir);
|
||||
totalSize+=subDirSize;
|
||||
} else if (0 == strcmp(strchr(entry->d_name, '.'), ".log")) { // only calc .log file size, and not include .idx file
|
||||
int64_t file_size = 0;
|
||||
taosStatFile(subdir, &file_size, NULL);
|
||||
totalSize+=file_size;
|
||||
}
|
||||
}
|
||||
|
||||
closedir(dp);
|
||||
return totalSize;
|
||||
}
|
||||
|
||||
|
||||
int queryDB(TAOS *taos, char *command) {
|
||||
TAOS_RES *pRes = taos_query(taos, command);
|
||||
int code = taos_errno(pRes);
|
||||
//if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) {
|
||||
if (code != 0) {
|
||||
pError("failed to reason:%s, sql: %s", tstrerror(code), command);
|
||||
taos_free_result(pRes);
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
return 0 ;
|
||||
}
|
||||
|
||||
int32_t init_env() {
|
||||
char sqlStr[1024] = {0};
|
||||
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
sprintf(sqlStr, "create database if not exists %s vgroups %d", g_stConfInfo.dbName, g_stConfInfo.numOfVgroups);
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in create db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
|
||||
pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
// create row value
|
||||
g_pRowValue = (char*)calloc(1, g_stConfInfo.numOfColumn * 16 + 128);
|
||||
if (NULL == g_pRowValue) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t dataLen = 0;
|
||||
int32_t sqlLen = 0;
|
||||
sqlLen += sprintf(sqlStr+sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName);
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfColumn; i++) {
|
||||
if (i == g_stConfInfo.numOfColumn - 1) {
|
||||
sqlLen += sprintf(sqlStr+sqlLen, "c%d int) ", i);
|
||||
memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899"));
|
||||
dataLen += strlen("66778899");
|
||||
} else {
|
||||
sqlLen += sprintf(sqlStr+sqlLen, "c%d int, ", i);
|
||||
memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, "));
|
||||
dataLen += strlen("66778899, ");
|
||||
}
|
||||
}
|
||||
sqlLen += sprintf(sqlStr+sqlLen, "tags (t0 int)");
|
||||
|
||||
pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create super table %s, reason:%s\n", g_stConfInfo.stbName, taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfTables; i++) {
|
||||
sprintf(sqlStr, "create table if not exists %s%d using %s tags(1)", g_stConfInfo.stbName, i, g_stConfInfo.stbName);
|
||||
pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create child table %s%d, reason:%s\n", g_stConfInfo.stbName, i, taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
|
||||
//const char* sql = "select * from tu1";
|
||||
sprintf(sqlStr, "select * from %s%d", g_stConfInfo.stbName, 0);
|
||||
pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr));
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taos_close(pConn);
|
||||
return 0;
|
||||
}
|
||||
|
||||
tmq_t* build_consumer() {
|
||||
char sqlStr[1024] = {0};
|
||||
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
tmq_conf_set(conf, "group.id", "tg2");
|
||||
tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0);
|
||||
return tmq;
|
||||
}
|
||||
|
||||
tmq_list_t* build_topic_list() {
|
||||
tmq_list_t* topic_list = tmq_list_new();
|
||||
tmq_list_append(topic_list, "test_stb_topic_1");
|
||||
return topic_list;
|
||||
}
|
||||
|
||||
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
int32_t cnt = 0;
|
||||
/*clock_t startTime = clock();*/
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmqmessage) {
|
||||
cnt++;
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
/*} else {*/
|
||||
/*break;*/
|
||||
}
|
||||
}
|
||||
/*clock_t endTime = clock();*/
|
||||
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
else
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
||||
static const int MIN_COMMIT_COUNT = 1000;
|
||||
|
||||
int msg_count = 0;
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
return;
|
||||
}
|
||||
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmqmessage) {
|
||||
msg_process(tmqmessage);
|
||||
tmq_message_destroy(tmqmessage);
|
||||
|
||||
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
|
||||
}
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err)
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
else
|
||||
fprintf(stderr, "%% Consumer closed\n");
|
||||
}
|
||||
|
||||
void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) {
|
||||
tmq_resp_err_t err;
|
||||
|
||||
if ((err = tmq_subscribe(tmq, topics))) {
|
||||
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
|
||||
printf("subscribe err\n");
|
||||
return;
|
||||
}
|
||||
int32_t batchCnt = 0;
|
||||
int32_t skipLogNum = 0;
|
||||
int64_t startTime = taosGetTimestampUs();
|
||||
while (running) {
|
||||
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500);
|
||||
if (tmqmessage) {
|
||||
batchCnt++;
|
||||
skipLogNum += tmqGetSkipLogNum(tmqmessage);
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
msg_process(tmqmessage);
|
||||
}
|
||||
tmq_message_destroy(tmqmessage);
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
int64_t endTime = taosGetTimestampUs();
|
||||
double consumeTime = (double)(endTime - startTime) / 1000000;
|
||||
|
||||
if (batchCnt != totalMsgs) {
|
||||
pPrint("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
|
||||
}
|
||||
|
||||
pPrint("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime);
|
||||
taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime, (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime, (double)walLogSize / 1024.0 / batchCnt);
|
||||
|
||||
err = tmq_consumer_close(tmq);
|
||||
if (err) {
|
||||
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
|
||||
}
|
||||
}
|
||||
|
||||
// sync insertion
|
||||
int32_t syncWriteData() {
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
char sqlStr[1024] = {0};
|
||||
sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
char* buffer = NULL;
|
||||
buffer = (char*)malloc(MAX_SQL_STR_LEN);
|
||||
if (NULL == buffer) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
|
||||
int64_t time_counter = g_stConfInfo.startTimestamp;
|
||||
for (int i = 0; i < g_stConfInfo.totalRowsOfPerTbl;) {
|
||||
for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
|
||||
int inserted = i;
|
||||
int64_t tmp_time = time_counter;
|
||||
|
||||
int32_t data_len = 0;
|
||||
data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
|
||||
int k;
|
||||
for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
|
||||
data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
|
||||
inserted++;
|
||||
k++;
|
||||
|
||||
if (inserted >= g_stConfInfo.totalRowsOfPerTbl) {
|
||||
break;
|
||||
}
|
||||
|
||||
if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int code = queryDB(pConn, buffer);
|
||||
if (0 != code){
|
||||
fprintf(stderr, "insert data error!\n");
|
||||
tfree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
totalMsgs++;
|
||||
|
||||
if (tID == g_stConfInfo.numOfTables - 1) {
|
||||
i = inserted;
|
||||
time_counter = tmp_time;
|
||||
}
|
||||
}
|
||||
}
|
||||
tfree(buffer);
|
||||
return totalMsgs;
|
||||
}
|
||||
|
||||
|
||||
// sync insertion
|
||||
int32_t syncWriteDataByRatio() {
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (pConn == NULL) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
char sqlStr[1024] = {0};
|
||||
sprintf(sqlStr, "use %s", g_stConfInfo.dbName);
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("error in use db, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
char* buffer = NULL;
|
||||
buffer = (char*)malloc(MAX_SQL_STR_LEN);
|
||||
if (NULL == buffer) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
|
||||
int32_t insertedOfT1 = 0;
|
||||
int32_t insertedOfT2 = 0;
|
||||
|
||||
int64_t tsOfT1 = g_stConfInfo.startTimestamp;
|
||||
int64_t tsOfT2 = g_stConfInfo.startTimestamp;
|
||||
int64_t tmp_time;
|
||||
|
||||
for (;;) {
|
||||
if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) {
|
||||
break;
|
||||
}
|
||||
|
||||
for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) {
|
||||
if (0 == tID) {
|
||||
tmp_time = tsOfT1;
|
||||
if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
|
||||
continue;
|
||||
}
|
||||
} else if (1 == tID){
|
||||
tmp_time = tsOfT2;
|
||||
if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
int32_t data_len = 0;
|
||||
data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID);
|
||||
int k;
|
||||
for (k = 0; k < g_stConfInfo.batchNumOfRow;) {
|
||||
data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue);
|
||||
k++;
|
||||
if (0 == tID) {
|
||||
insertedOfT1++;
|
||||
if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) {
|
||||
break;
|
||||
}
|
||||
} else if (1 == tID){
|
||||
insertedOfT2++;
|
||||
if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
int code = queryDB(pConn, buffer);
|
||||
if (0 != code){
|
||||
fprintf(stderr, "insert data error!\n");
|
||||
tfree(buffer);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (0 == tID) {
|
||||
tsOfT1 = tmp_time;
|
||||
} else if (1 == tID){
|
||||
tsOfT2 = tmp_time;
|
||||
}
|
||||
|
||||
totalMsgs++;
|
||||
}
|
||||
}
|
||||
pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl, g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2);
|
||||
tfree(buffer);
|
||||
return totalMsgs;
|
||||
}
|
||||
|
||||
void printParaIntoFile() {
|
||||
// FILE *fp = fopen(g_stConfInfo.resultFileName, "a");
|
||||
TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM);
|
||||
if (NULL == pFile) {
|
||||
fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName);
|
||||
exit -1;
|
||||
};
|
||||
g_fp = pFile;
|
||||
|
||||
time_t tTime = time(NULL);
|
||||
struct tm tm = *localtime(&tTime);
|
||||
|
||||
taosFprintfFile(pFile, "###################################################################\n");
|
||||
taosFprintfFile(pFile, "# configDir: %s\n", configDir);
|
||||
taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName);
|
||||
taosFprintfFile(pFile, "# stbName: %s\n", g_stConfInfo.stbName);
|
||||
taosFprintfFile(pFile, "# vnodeWalPath: %s\n", g_stConfInfo.vnodeWalPath);
|
||||
taosFprintfFile(pFile, "# numOfTables: %d\n", g_stConfInfo.numOfTables);
|
||||
taosFprintfFile(pFile, "# numOfThreads: %d\n", g_stConfInfo.numOfThreads);
|
||||
taosFprintfFile(pFile, "# numOfVgroups: %d\n", g_stConfInfo.numOfVgroups);
|
||||
taosFprintfFile(pFile, "# runMode: %d\n", g_stConfInfo.runMode);
|
||||
taosFprintfFile(pFile, "# ratio: %f\n", g_stConfInfo.ratio);
|
||||
taosFprintfFile(pFile, "# numOfColumn: %d\n", g_stConfInfo.numOfColumn);
|
||||
taosFprintfFile(pFile, "# batchNumOfRow: %d\n", g_stConfInfo.batchNumOfRow);
|
||||
taosFprintfFile(pFile, "# totalRowsOfPerTbl: %d\n", g_stConfInfo.totalRowsOfPerTbl);
|
||||
taosFprintfFile(pFile, "# totalRowsOfT2: %d\n", g_stConfInfo.totalRowsOfT2);
|
||||
taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
|
||||
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
|
||||
taosFprintfFile(pFile, "###################################################################\n");
|
||||
taosFprintfFile(pFile, "|-------------------------------insert info-----------------------------|--------------------------------consume info---------------------------------|\n");
|
||||
taosFprintfFile(pFile, "|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume time(s) | msgs/s | MB/s | avg msg size(KB) |\n");
|
||||
taosFprintfFile(g_fp, "|%10d", g_stConfInfo.batchNumOfRow);
|
||||
}
|
||||
|
||||
int main(int32_t argc, char *argv[]) {
|
||||
parseArgument(argc, argv);
|
||||
printParaIntoFile();
|
||||
|
||||
int64_t walLogSize = 0;
|
||||
|
||||
int code;
|
||||
code = init_env();
|
||||
if (code != 0) {
|
||||
fprintf(stderr, "%% init_env error!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t totalMsgs = 0;
|
||||
|
||||
if (g_stConfInfo.runMode != TMQ_RUN_ONLY_CONSUME) {
|
||||
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
if (1 == g_stConfInfo.ratio) {
|
||||
totalMsgs = syncWriteData();
|
||||
} else {
|
||||
totalMsgs = syncWriteDataByRatio();
|
||||
}
|
||||
|
||||
if (totalMsgs <= 0) {
|
||||
pError("inset data error!\n");
|
||||
return -1;
|
||||
}
|
||||
int64_t endTs = taosGetTimestampUs();
|
||||
int64_t delay = endTs - startTs;
|
||||
|
||||
int32_t totalRows = 0;
|
||||
if (1 == g_stConfInfo.ratio) {
|
||||
totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables;
|
||||
} else {
|
||||
totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio);
|
||||
}
|
||||
|
||||
float seconds = delay / 1000000.0;
|
||||
float rowsSpeed = totalRows / seconds;
|
||||
float msgsSpeed = totalMsgs / seconds;
|
||||
|
||||
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
|
||||
if (walLogSize <= 0) {
|
||||
pError("vnode2/wal size incorrect!");
|
||||
} else {
|
||||
pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0));
|
||||
}
|
||||
|
||||
pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, totalMsgs, seconds, rowsSpeed, msgsSpeed);
|
||||
taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed, (double)walLogSize/(1024 * 1024.0));
|
||||
}
|
||||
|
||||
if (g_stConfInfo.runMode == TMQ_RUN_ONLY_INSERT) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
tmq_t* tmq = build_consumer();
|
||||
tmq_list_t* topic_list = build_topic_list();
|
||||
if ((NULL == tmq) || (NULL == topic_list)){
|
||||
return -1;
|
||||
}
|
||||
|
||||
perf_loop(tmq, topic_list, totalMsgs, walLogSize);
|
||||
|
||||
tfree(g_pRowValue);
|
||||
taosFprintfFile(g_fp, "\n");
|
||||
taosCloseFile(&g_fp);
|
||||
return 0;
|
||||
}
|
||||
|
Loading…
Reference in New Issue