diff --git a/.gitignore b/.gitignore index b9b5341b06..39c7a7ee5d 100644 --- a/.gitignore +++ b/.gitignore @@ -24,7 +24,6 @@ mac/ *.orig src/connector/nodejs/node_modules/ src/connector/nodejs/out/ -tests/test/ tests/taoshebei/ tests/taoscsv/ tests/taosdalipu/ diff --git a/tests/test/c/tmqDemo.c b/tests/test/c/tmqDemo.c new file mode 100644 index 0000000000..3fe0425c91 --- /dev/null +++ b/tests/test/c/tmqDemo.c @@ -0,0 +1,706 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +// clang-format off + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "taos.h" +#include "taoserror.h" +#include "tlog.h" + +#define GREEN "\033[1;32m" +#define NC "\033[0m" +#define min(a, b) (((a) < (b)) ? (a) : (b)) + +#define MAX_SQL_STR_LEN (1024 * 1024) +#define MAX_ROW_STR_LEN (16 * 1024) + +enum _RUN_MODE { + TMQ_RUN_INSERT_AND_CONSUME, + TMQ_RUN_ONLY_INSERT, + TMQ_RUN_ONLY_CONSUME, + TMQ_RUN_MODE_BUTT +}; + +typedef struct { + char dbName[32]; + char stbName[64]; + char resultFileName[256]; + char vnodeWalPath[256]; + int32_t numOfThreads; + int32_t numOfTables; + int32_t numOfVgroups; + int32_t runMode; + int32_t numOfColumn; + double ratio; + int32_t batchNumOfRow; + int32_t totalRowsOfPerTbl; + int64_t startTimestamp; + int32_t showMsgFlag; + + int32_t totalRowsOfT2; +} SConfInfo; + +static SConfInfo g_stConfInfo = { + "tmqdb", + "stb", + "./tmqResult.txt", // output_file + "/data2/dnode/data/vnode/vnode2/wal", + 1, // threads + 1, // tables + 1, // vgroups + 0, // run mode + 1, // columns + 1, // ratio + 1, // batch size + 10000, // total rows for per table + 0, // 2020-01-01 00:00:00.000 + 0, // show consume msg switch + 10000, +}; + +char* g_pRowValue = NULL; +TdFilePtr g_fp = NULL; + +static void printHelp() { + char indent[10] = " "; + printf("Used to test the performance while create table\n"); + + printf("%s%s\n", indent, "-c"); + printf("%s%s%s%s\n", indent, indent, "Configuration directory, default is ", configDir); + printf("%s%s\n", indent, "-d"); + printf("%s%s%s%s\n", indent, indent, "The name of the database to be created, default is ", g_stConfInfo.dbName); + printf("%s%s\n", indent, "-s"); + printf("%s%s%s%s\n", indent, indent, "The name of the super table to be created, default is ", g_stConfInfo.stbName); + printf("%s%s\n", indent, "-f"); + printf("%s%s%s%s\n", indent, indent, "The file of result, default is ", g_stConfInfo.resultFileName); + printf("%s%s\n", indent, "-w"); + printf("%s%s%s%s\n", indent, indent, "The path of vnode of wal, default is ", g_stConfInfo.vnodeWalPath); + printf("%s%s\n", indent, "-t"); + printf("%s%s%s%d\n", indent, indent, "numOfThreads, default is ", g_stConfInfo.numOfThreads); + printf("%s%s\n", indent, "-n"); + printf("%s%s%s%d\n", indent, indent, "numOfTables, default is ", g_stConfInfo.numOfTables); + printf("%s%s\n", indent, "-v"); + printf("%s%s%s%d\n", indent, indent, "numOfVgroups, default is ", g_stConfInfo.numOfVgroups); + printf("%s%s\n", indent, "-a"); + printf("%s%s%s%d\n", indent, indent, "runMode, default is ", g_stConfInfo.runMode); + printf("%s%s\n", indent, "-l"); + printf("%s%s%s%d\n", indent, indent, "numOfColumn, default is ", g_stConfInfo.numOfColumn); + printf("%s%s\n", indent, "-q"); + printf("%s%s%s%f\n", indent, indent, "ratio, default is ", g_stConfInfo.ratio); + printf("%s%s\n", indent, "-b"); + printf("%s%s%s%d\n", indent, indent, "batchNumOfRow, default is ", g_stConfInfo.batchNumOfRow); + printf("%s%s\n", indent, "-r"); + printf("%s%s%s%d\n", indent, indent, "totalRowsOfPerTbl, default is ", g_stConfInfo.totalRowsOfPerTbl); + printf("%s%s\n", indent, "-m"); + printf("%s%s%s%" PRId64 "\n", indent, indent, "startTimestamp, default is ", g_stConfInfo.startTimestamp); + printf("%s%s\n", indent, "-g"); + printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag); + + exit(EXIT_SUCCESS); +} + +void parseArgument(int32_t argc, char *argv[]) { + + g_stConfInfo.startTimestamp = 1640966400000; // 2020-01-01 00:00:00.000 + + + for (int32_t i = 1; i < argc; i++) { + if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) { + printHelp(); + exit(0); + } else if (strcmp(argv[i], "-d") == 0) { + strcpy(g_stConfInfo.dbName, argv[++i]); + } else if (strcmp(argv[i], "-c") == 0) { + strcpy(configDir, argv[++i]); + } else if (strcmp(argv[i], "-s") == 0) { + strcpy(g_stConfInfo.stbName, argv[++i]); + } else if (strcmp(argv[i], "-w") == 0) { + strcpy(g_stConfInfo.vnodeWalPath, argv[++i]); + } else if (strcmp(argv[i], "-f") == 0) { + strcpy(g_stConfInfo.resultFileName, argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + g_stConfInfo.numOfThreads = atoi(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0) { + g_stConfInfo.numOfTables = atoll(argv[++i]); + } else if (strcmp(argv[i], "-v") == 0) { + g_stConfInfo.numOfVgroups = atoi(argv[++i]); + } else if (strcmp(argv[i], "-a") == 0) { + g_stConfInfo.runMode = atoi(argv[++i]); + } else if (strcmp(argv[i], "-b") == 0) { + g_stConfInfo.batchNumOfRow = atoi(argv[++i]); + } else if (strcmp(argv[i], "-r") == 0) { + g_stConfInfo.totalRowsOfPerTbl = atoi(argv[++i]); + } else if (strcmp(argv[i], "-l") == 0) { + g_stConfInfo.numOfColumn = atoi(argv[++i]); + } else if (strcmp(argv[i], "-q") == 0) { + g_stConfInfo.ratio = atof(argv[++i]); + } else if (strcmp(argv[i], "-m") == 0) { + g_stConfInfo.startTimestamp = atol(argv[++i]); + } else if (strcmp(argv[i], "-g") == 0) { + g_stConfInfo.showMsgFlag = atol(argv[++i]); + } else { + pPrint("%s unknow para: %s %s", GREEN, argv[++i], NC); + exit(-1); + } + } + + g_stConfInfo.totalRowsOfT2 = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.ratio; + + pPrint("%s configDir:%s %s", GREEN, configDir, NC); + pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); + pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC); + pPrint("%s resultFileName:%s %s", GREEN, g_stConfInfo.resultFileName, NC); + pPrint("%s vnodeWalPath:%s %s", GREEN, g_stConfInfo.vnodeWalPath, NC); + pPrint("%s numOfTables:%d %s", GREEN, g_stConfInfo.numOfTables, NC); + pPrint("%s numOfThreads:%d %s", GREEN, g_stConfInfo.numOfThreads, NC); + pPrint("%s numOfVgroups:%d %s", GREEN, g_stConfInfo.numOfVgroups, NC); + pPrint("%s runMode:%d %s", GREEN, g_stConfInfo.runMode, NC); + pPrint("%s ratio:%f %s", GREEN, g_stConfInfo.ratio, NC); + pPrint("%s numOfColumn:%d %s", GREEN, g_stConfInfo.numOfColumn, NC); + pPrint("%s batchNumOfRow:%d %s", GREEN, g_stConfInfo.batchNumOfRow, NC); + pPrint("%s totalRowsOfPerTbl:%d %s", GREEN, g_stConfInfo.totalRowsOfPerTbl, NC); + pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC); + pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC); + pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); +} + +static int running = 1; +static void msg_process(tmq_message_t* message) { tmqShowMsg(message); } + +// calc dir size (not include itself 4096Byte) +int64_t getDirectorySize(char *dir) +{ + DIR *dp; + struct dirent *entry; + int64_t totalSize=0; + + if ((dp = opendir(dir)) == NULL) { + fprintf(stderr, "Cannot open dir: %s\n", dir); + return -1; + } + + //lstat(dir, &statbuf); + //totalSize+=statbuf.st_size; + + while ((entry = readdir(dp)) != NULL) { + char subdir[1024]; + sprintf(subdir, "%s/%s", dir, entry->d_name); + + //printf("===d_name: %s\n", entry->d_name); + if (taosIsDir(subdir)) { + if (strcmp(".", entry->d_name) == 0 || strcmp("..", entry->d_name) == 0) { + continue; + } + + int64_t subDirSize = getDirectorySize(subdir); + totalSize+=subDirSize; + } else if (0 == strcmp(strchr(entry->d_name, '.'), ".log")) { // only calc .log file size, and not include .idx file + int64_t file_size = 0; + taosStatFile(subdir, &file_size, NULL); + totalSize+=file_size; + } + } + + closedir(dp); + return totalSize; +} + + +int queryDB(TAOS *taos, char *command) { + TAOS_RES *pRes = taos_query(taos, command); + int code = taos_errno(pRes); + //if ((code != 0) && (code != TSDB_CODE_RPC_AUTH_REQUIRED)) { + if (code != 0) { + pError("failed to reason:%s, sql: %s", tstrerror(code), command); + taos_free_result(pRes); + return -1; + } + taos_free_result(pRes); + return 0 ; +} + +int32_t init_env() { + char sqlStr[1024] = {0}; + + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + sprintf(sqlStr, "create database if not exists %s vgroups %d", g_stConfInfo.dbName, g_stConfInfo.numOfVgroups); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + sprintf(sqlStr, "use %s", g_stConfInfo.dbName); + pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + // create row value + g_pRowValue = (char*)calloc(1, g_stConfInfo.numOfColumn * 16 + 128); + if (NULL == g_pRowValue) { + return -1; + } + + int32_t dataLen = 0; + int32_t sqlLen = 0; + sqlLen += sprintf(sqlStr+sqlLen, "create stable if not exists %s (ts timestamp, ", g_stConfInfo.stbName); + for (int32_t i = 0; i < g_stConfInfo.numOfColumn; i++) { + if (i == g_stConfInfo.numOfColumn - 1) { + sqlLen += sprintf(sqlStr+sqlLen, "c%d int) ", i); + memcpy(g_pRowValue + dataLen, "66778899", strlen("66778899")); + dataLen += strlen("66778899"); + } else { + sqlLen += sprintf(sqlStr+sqlLen, "c%d int, ", i); + memcpy(g_pRowValue + dataLen, "66778899, ", strlen("66778899, ")); + dataLen += strlen("66778899, "); + } + } + sqlLen += sprintf(sqlStr+sqlLen, "tags (t0 int)"); + + pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("failed to create super table %s, reason:%s\n", g_stConfInfo.stbName, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + for (int32_t i = 0; i < g_stConfInfo.numOfTables; i++) { + sprintf(sqlStr, "create table if not exists %s%d using %s tags(1)", g_stConfInfo.stbName, i, g_stConfInfo.stbName); + pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("failed to create child table %s%d, reason:%s\n", g_stConfInfo.stbName, i, taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + } + + //const char* sql = "select * from tu1"; + sprintf(sqlStr, "select * from %s%d", g_stConfInfo.stbName, 0); + pRes = tmq_create_topic(pConn, "test_stb_topic_1", sqlStr, strlen(sqlStr)); + if (taos_errno(pRes) != 0) { + printf("failed to create topic test_stb_topic_1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + taos_close(pConn); + return 0; +} + +tmq_t* build_consumer() { + char sqlStr[1024] = {0}; + + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + sprintf(sqlStr, "use %s", g_stConfInfo.dbName); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_stb_topic_1"); + return topic_list; +} + +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + tmq_resp_err_t err; + + if ((err = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + /*clock_t startTime = clock();*/ + while (running) { + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + if (tmqmessage) { + cnt++; + msg_process(tmqmessage); + tmq_message_destroy(tmqmessage); + /*} else {*/ + /*break;*/ + } + } + /*clock_t endTime = clock();*/ + /*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/ + + err = tmq_consumer_close(tmq); + if (err) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + static const int MIN_COMMIT_COUNT = 1000; + + int msg_count = 0; + tmq_resp_err_t err; + + if ((err = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + return; + } + + while (running) { + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + if (tmqmessage) { + msg_process(tmqmessage); + tmq_message_destroy(tmqmessage); + + if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0); + } + } + + err = tmq_consumer_close(tmq); + if (err) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLogSize) { + tmq_resp_err_t err; + + if ((err = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); + printf("subscribe err\n"); + return; + } + int32_t batchCnt = 0; + int32_t skipLogNum = 0; + int64_t startTime = taosGetTimestampUs(); + while (running) { + tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 500); + if (tmqmessage) { + batchCnt++; + skipLogNum += tmqGetSkipLogNum(tmqmessage); + if (0 != g_stConfInfo.showMsgFlag) { + msg_process(tmqmessage); + } + tmq_message_destroy(tmqmessage); + } else { + break; + } + } + int64_t endTime = taosGetTimestampUs(); + double consumeTime = (double)(endTime - startTime) / 1000000; + + if (batchCnt != totalMsgs) { + pPrint("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC); + } + + pPrint("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime); + taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime, (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime, (double)walLogSize / 1024.0 / batchCnt); + + err = tmq_consumer_close(tmq); + if (err) { + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); + } +} + +// sync insertion +int32_t syncWriteData() { + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + char sqlStr[1024] = {0}; + sprintf(sqlStr, "use %s", g_stConfInfo.dbName); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + char* buffer = NULL; + buffer = (char*)malloc(MAX_SQL_STR_LEN); + if (NULL == buffer) { + return -1; + } + + int32_t totalMsgs = 0; + + int64_t time_counter = g_stConfInfo.startTimestamp; + for (int i = 0; i < g_stConfInfo.totalRowsOfPerTbl;) { + for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) { + int inserted = i; + int64_t tmp_time = time_counter; + + int32_t data_len = 0; + data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID); + int k; + for (k = 0; k < g_stConfInfo.batchNumOfRow;) { + data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue); + inserted++; + k++; + + if (inserted >= g_stConfInfo.totalRowsOfPerTbl) { + break; + } + + if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) { + break; + } + } + + int code = queryDB(pConn, buffer); + if (0 != code){ + fprintf(stderr, "insert data error!\n"); + tfree(buffer); + return -1; + } + + totalMsgs++; + + if (tID == g_stConfInfo.numOfTables - 1) { + i = inserted; + time_counter = tmp_time; + } + } + } + tfree(buffer); + return totalMsgs; +} + + +// sync insertion +int32_t syncWriteDataByRatio() { + TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + char sqlStr[1024] = {0}; + sprintf(sqlStr, "use %s", g_stConfInfo.dbName); + TAOS_RES* pRes = taos_query(pConn, sqlStr); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + char* buffer = NULL; + buffer = (char*)malloc(MAX_SQL_STR_LEN); + if (NULL == buffer) { + return -1; + } + + int32_t totalMsgs = 0; + + int32_t insertedOfT1 = 0; + int32_t insertedOfT2 = 0; + + int64_t tsOfT1 = g_stConfInfo.startTimestamp; + int64_t tsOfT2 = g_stConfInfo.startTimestamp; + int64_t tmp_time; + + for (;;) { + if ((insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) && (insertedOfT2 >= g_stConfInfo.totalRowsOfT2)) { + break; + } + + for (int tID = 0; tID <= g_stConfInfo.numOfTables - 1; tID++) { + if (0 == tID) { + tmp_time = tsOfT1; + if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) { + continue; + } + } else if (1 == tID){ + tmp_time = tsOfT2; + if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) { + continue; + } + } + + int32_t data_len = 0; + data_len += sprintf(buffer + data_len, "insert into %s%d values", g_stConfInfo.stbName, tID); + int k; + for (k = 0; k < g_stConfInfo.batchNumOfRow;) { + data_len += sprintf(buffer + data_len, "(%" PRId64 ", %s) ", tmp_time++, g_pRowValue); + k++; + if (0 == tID) { + insertedOfT1++; + if (insertedOfT1 >= g_stConfInfo.totalRowsOfPerTbl) { + break; + } + } else if (1 == tID){ + insertedOfT2++; + if (insertedOfT2 >= g_stConfInfo.totalRowsOfT2) { + break; + } + } + + if (data_len > MAX_SQL_STR_LEN - MAX_ROW_STR_LEN) { + break; + } + } + + int code = queryDB(pConn, buffer); + if (0 != code){ + fprintf(stderr, "insert data error!\n"); + tfree(buffer); + return -1; + } + + if (0 == tID) { + tsOfT1 = tmp_time; + } else if (1 == tID){ + tsOfT2 = tmp_time; + } + + totalMsgs++; + } + } + pPrint("expect insert rows: T1[%d] T2[%d], actual insert rows: T1[%d] T2[%d]\n", g_stConfInfo.totalRowsOfPerTbl, g_stConfInfo.totalRowsOfT2, insertedOfT1, insertedOfT2); + tfree(buffer); + return totalMsgs; +} + +void printParaIntoFile() { + // FILE *fp = fopen(g_stConfInfo.resultFileName, "a"); + TdFilePtr pFile = taosOpenFile(g_stConfInfo.resultFileName, TD_FILE_CTEATE | TD_FILE_WRITE | TD_FILE_APPEND | TD_FILE_STREAM); + if (NULL == pFile) { + fprintf(stderr, "Failed to open %s for save result\n", g_stConfInfo.resultFileName); + exit -1; + }; + g_fp = pFile; + + time_t tTime = time(NULL); + struct tm tm = *localtime(&tTime); + + taosFprintfFile(pFile, "###################################################################\n"); + taosFprintfFile(pFile, "# configDir: %s\n", configDir); + taosFprintfFile(pFile, "# dbName: %s\n", g_stConfInfo.dbName); + taosFprintfFile(pFile, "# stbName: %s\n", g_stConfInfo.stbName); + taosFprintfFile(pFile, "# vnodeWalPath: %s\n", g_stConfInfo.vnodeWalPath); + taosFprintfFile(pFile, "# numOfTables: %d\n", g_stConfInfo.numOfTables); + taosFprintfFile(pFile, "# numOfThreads: %d\n", g_stConfInfo.numOfThreads); + taosFprintfFile(pFile, "# numOfVgroups: %d\n", g_stConfInfo.numOfVgroups); + taosFprintfFile(pFile, "# runMode: %d\n", g_stConfInfo.runMode); + taosFprintfFile(pFile, "# ratio: %f\n", g_stConfInfo.ratio); + taosFprintfFile(pFile, "# numOfColumn: %d\n", g_stConfInfo.numOfColumn); + taosFprintfFile(pFile, "# batchNumOfRow: %d\n", g_stConfInfo.batchNumOfRow); + taosFprintfFile(pFile, "# totalRowsOfPerTbl: %d\n", g_stConfInfo.totalRowsOfPerTbl); + taosFprintfFile(pFile, "# totalRowsOfT2: %d\n", g_stConfInfo.totalRowsOfT2); + taosFprintfFile(pFile, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1, + tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec); + taosFprintfFile(pFile, "###################################################################\n"); + taosFprintfFile(pFile, "|-------------------------------insert info-----------------------------|--------------------------------consume info---------------------------------|\n"); + taosFprintfFile(pFile, "|batch size| insert msgs | insert time(s) | msgs/s | walLogSize(MB) | consume msgs | consume time(s) | msgs/s | MB/s | avg msg size(KB) |\n"); + taosFprintfFile(g_fp, "|%10d", g_stConfInfo.batchNumOfRow); +} + +int main(int32_t argc, char *argv[]) { + parseArgument(argc, argv); + printParaIntoFile(); + + int64_t walLogSize = 0; + + int code; + code = init_env(); + if (code != 0) { + fprintf(stderr, "%% init_env error!\n"); + return -1; + } + + int32_t totalMsgs = 0; + + if (g_stConfInfo.runMode != TMQ_RUN_ONLY_CONSUME) { + + int64_t startTs = taosGetTimestampUs(); + if (1 == g_stConfInfo.ratio) { + totalMsgs = syncWriteData(); + } else { + totalMsgs = syncWriteDataByRatio(); + } + + if (totalMsgs <= 0) { + pError("inset data error!\n"); + return -1; + } + int64_t endTs = taosGetTimestampUs(); + int64_t delay = endTs - startTs; + + int32_t totalRows = 0; + if (1 == g_stConfInfo.ratio) { + totalRows = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.numOfTables; + } else { + totalRows = g_stConfInfo.totalRowsOfPerTbl * (1 + g_stConfInfo.ratio); + } + + float seconds = delay / 1000000.0; + float rowsSpeed = totalRows / seconds; + float msgsSpeed = totalMsgs / seconds; + + walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); + if (walLogSize <= 0) { + pError("vnode2/wal size incorrect!"); + } else { + pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0)); + } + + pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, totalMsgs, seconds, rowsSpeed, msgsSpeed); + taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed, (double)walLogSize/(1024 * 1024.0)); + } + + if (g_stConfInfo.runMode == TMQ_RUN_ONLY_INSERT) { + return 0; + } + + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + if ((NULL == tmq) || (NULL == topic_list)){ + return -1; + } + + perf_loop(tmq, topic_list, totalMsgs, walLogSize); + + tfree(g_pRowValue); + taosFprintfFile(g_fp, "\n"); + taosCloseFile(&g_fp); + return 0; +} +