From b3fe36a0bea15fda1f60acaa4dcdf73b07721048 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Sat, 16 Jul 2022 17:45:55 +0800 Subject: [PATCH] test: extent tmqSim --- tests/test/c/tmqSim.c | 525 +++++++++++++++++++++++++++++++++++++++++- 1 file changed, 521 insertions(+), 4 deletions(-) diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index b4f86d52b5..5459e3f159 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -20,6 +20,7 @@ #include #include #include +#include #include "taos.h" #include "taosdef.h" @@ -35,6 +36,8 @@ #define MAX_ROW_STR_LEN (16 * 1024) #define MAX_CONSUMER_THREAD_CNT (16) #define MAX_VGROUP_CNT (32) +#define SEND_TIME_UNIT 10 // ms +#define MAX_SQL_LEN 1048576 typedef enum { NOTIFY_CMD_START_CONSUM, @@ -42,6 +45,12 @@ typedef enum { NOTIFY_CMD_ID_BUTT, } NOTIFY_CMD_ID; +typedef enum enumQUERY_TYPE { + NO_INSERT_TYPE, + INSERT_TYPE, + QUERY_TYPE_BUT +} QUERY_TYPE; + typedef struct { TdThread thread; int32_t consumerId; @@ -58,6 +67,7 @@ typedef struct { int64_t consumeMsgCnt; int64_t consumeRowCnt; + int64_t consumeLen; int32_t checkresult; char topicString[1024]; @@ -77,14 +87,19 @@ typedef struct { int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume int64_t ts; - TAOS* taos; + TAOS* taos; + + // below parameters is used by omb test + int32_t producerRate; // unit: msgs/s + int64_t totalProduceMsgs; + int64_t totalMsgsLen; } SThreadInfo; typedef struct { // input from argvs char cdbName[32]; - char dbName[32]; + char dbName[64]; int32_t showMsgFlag; int32_t showRowFlag; int32_t saveRowFlag; @@ -93,11 +108,22 @@ typedef struct { int32_t useSnapshot; int64_t nowTime; SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT]; + + SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT]; + + // below parameters is used by omb test + char topic[64]; + int32_t producers; + int32_t producerRate; + int32_t runDurationMinutes; + int32_t batchSize; + int32_t payloadLen; } SConfInfo; static SConfInfo g_stConfInfo; TdFilePtr g_fp = NULL; static int running = 1; +char* g_payload = NULL; // char* g_pRowValue = NULL; // TdFilePtr g_fp = NULL; @@ -117,7 +143,29 @@ static void printHelp() { printf("%s%s\n", indent, "-s"); printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag); printf("%s%s\n", indent, "-y"); - printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay); + printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay); + printf("%s%s\n", indent, "-e"); + printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot); + + printf("%s%s\n", indent, "-t"); + printf("%s%s%s\n", indent, indent, "topic name, default is null"); + + printf("%s%s\n", indent, "-x"); + printf("%s%s%s\n", indent, indent, "consume thread number, default is 1"); + + + printf("%s%s\n", indent, "-l"); + printf("%s%s%s\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes); + printf("%s%s\n", indent, "-p"); + printf("%s%s%s\n", indent, indent, "producer thread number, default is 0"); + printf("%s%s\n", indent, "-b"); + printf("%s%s%s\n", indent, indent, "batch size, default is 1"); + printf("%s%s\n", indent, "-i"); + printf("%s%s%s\n", indent, indent, "produce rate unit is msgs /s, default is 100000"); + printf("%s%s\n", indent, "-n"); + printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000"); + + exit(EXIT_SUCCESS); } @@ -144,7 +192,11 @@ void initLogFile() { pid_t process_id = getpid(); - sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString)); + if (0 != strlen(g_stConfInfo.topic)) { + sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString)); + } else { + sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString)); + } #ifdef WINDOWS for (int i = 2; i < sizeof(filename); i++) { if (filename[i] == ':') filename[i] = '-'; @@ -199,6 +251,9 @@ void parseArgument(int32_t argc, char* argv[]) { g_stConfInfo.showRowFlag = 0; g_stConfInfo.saveRowFlag = 0; g_stConfInfo.consumeDelay = 5; + g_stConfInfo.numOfThread = 1; + g_stConfInfo.batchSize = 1; + g_stConfInfo.producers = 0; g_stConfInfo.nowTime = taosGetTimestampMs(); @@ -222,12 +277,38 @@ void parseArgument(int32_t argc, char* argv[]) { g_stConfInfo.consumeDelay = atol(argv[++i]); } else if (strcmp(argv[i], "-e") == 0) { g_stConfInfo.useSnapshot = atol(argv[++i]); + } else if (strcmp(argv[i], "-t") == 0) { + char tmpBuf[56]; + strcpy(tmpBuf, argv[++i]); + sprintf(g_stConfInfo.topic, "`%s`", tmpBuf); + } else if (strcmp(argv[i], "-x") == 0) { + g_stConfInfo.numOfThread = atol(argv[++i]); + } else if (strcmp(argv[i], "-l") == 0) { + g_stConfInfo.runDurationMinutes = atol(argv[++i]); + } else if (strcmp(argv[i], "-p") == 0) { + g_stConfInfo.producers = atol(argv[++i]); + } else if (strcmp(argv[i], "-b") == 0) { + g_stConfInfo.batchSize = atol(argv[++i]); + } else if (strcmp(argv[i], "-i") == 0) { + g_stConfInfo.producerRate = atol(argv[++i]); + } else if (strcmp(argv[i], "-n") == 0) { + g_stConfInfo.payloadLen = atol(argv[++i]); } else { pError("%s unknow para: %s %s", GREEN, argv[++i], NC); exit(-1); } } + g_payload = taosMemoryCalloc(g_stConfInfo.payloadLen + 1, 1); + if (NULL == g_payload) { + pPrint("%s failed to malloc for payload %s", GREEN, NC); + exit(-1); + } + + for (int32_t i = 0; i < g_stConfInfo.payloadLen; i++) { + strcpy(&g_payload[i], "a"); + } + initLogFile(); taosFprintfFile(g_fp, "====parseArgument() success\n"); @@ -240,6 +321,11 @@ void parseArgument(int32_t argc, char* argv[]) { pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC); pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC); pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC); + + pPrint("%s snapshot:%d %s", GREEN, g_stConfInfo.useSnapshot, NC); + + pPrint("%s omb topic:%s %s", GREEN, g_stConfInfo.topic, NC); + pPrint("%s numOfThread:%d %s", GREEN, g_stConfInfo.numOfThread, NC); #endif } @@ -909,8 +995,439 @@ int32_t getConsumeInfo() { return 0; } + +static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) { + char buf[16*1024]; + int32_t totalRows = 0; + int32_t totalLen = 0; + + // printf("topic: %s\n", tmq_get_topic_name(msg)); + //int32_t vgroupId = tmq_get_vgroup_id(msg); + //const char* dbName = tmq_get_db_name(msg); + + //taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex); + //taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table", + // tmq_get_topic_name(msg), vgroupId); + + while (1) { + TAOS_ROW row = taos_fetch_row(msg); + + if (row == NULL) break; + + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + //int32_t* length = taos_fetch_lengths(msg); + //int32_t precision = taos_result_precision(msg); + //const char* tbName = tmq_get_table_name(msg); + + taos_print_row(buf, row, fields, numOfFields); + totalLen += strlen(buf); + totalRows++; + } + + *lenOfRows = totalLen; + return totalRows; +} + +void omb_loop_consume(SThreadInfo* pInfo) { + int32_t code; + + int32_t once_flag = 0; + + int64_t totalMsgs = 0; + int64_t totalRows = 0; + + char tmpString[128]; + taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), + pInfo->consumerId); + printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), + pInfo->consumerId); + + pInfo->ts = taosGetTimestampMs(); + + int64_t lastTotalMsgs = 0; + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); + + int64_t totalLenOfMsg = 0; + int64_t lastTotalLenOfMsg = 0; + int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); + while (running) { + TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay); + if (tmqMsg) { + int64_t lenOfMsg = 0; + totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg); + totalLenOfMsg += lenOfMsg; + taos_free_result(tmqMsg); + totalMsgs++; + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 10 * 1000) { + int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg; + int64_t deltaTime = currentPrintTime - lastPrintTime; + printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n", + pInfo->consumerId, totalRows, totalMsgs, + (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, + currentLenOfMsg*1000.0/(1024*1024)/deltaTime); + + taosFprintfFile( + g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period cons rate: %.3f msgs/s, %.1f MB/s\n", + pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, currentLenOfMsg*1000.0/deltaTime); + lastPrintTime = currentPrintTime; + lastTotalMsgs = totalMsgs; + lastTotalLenOfMsg = totalLenOfMsg; + } + } else { + char tmpString[128]; + taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); + printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString)); + int64_t currentPrintTime = taosGetTimestampMs(); + int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg; + int64_t deltaTime = currentPrintTime - lastPrintTime; + printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n", + pInfo->consumerId, totalRows, totalMsgs, + (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, + currentLenOfMsg*1000.0/(1024*1024)/deltaTime); + break; + } + } + + pInfo->consumeMsgCnt = totalMsgs; + pInfo->consumeRowCnt = totalRows; + pInfo->consumeLen = totalLenOfMsg; + +} + + +void* ombConsumeThreadFunc(void* param) { + SThreadInfo* pInfo = (SThreadInfo*)param; + + //################### set key ######################## + tmq_conf_t* conf = tmq_conf_new(); + // tmq_conf_set(conf, "td.connect.ip", "localhost"); + // tmq_conf_set(conf, "td.connect.port", "6030"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + // tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo); + tmq_conf_set(conf, "group.id", "ombCgrp"); + // tmq_conf_set(conf, "msg.with.table.name", "true"); + // tmq_conf_set(conf, "client.id", "c-001"); + // tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "enable.auto.commit", "false"); + // tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + // tmq_conf_set(conf, "auto.offset.reset", "none"); + // tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + // + if (g_stConfInfo.useSnapshot) { + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + } + + pInfo->tmq = tmq_consumer_new(conf, NULL, 0); + + tmq_conf_destroy(conf); + + //################### set topic ########################## + pInfo->topicList = tmq_list_new(); + tmq_list_append(pInfo->topicList, g_stConfInfo.topic); + + if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) { + taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n"); + assert(0); + return NULL; + } + + int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList); + if (err != 0) { + pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err)); + taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err)); + assert(0); + return NULL; + } + + tmq_list_destroy(pInfo->topicList); + pInfo->topicList = NULL; + + omb_loop_consume(pInfo); + + err = tmq_unsubscribe(pInfo->tmq); + if (err != 0) { + pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); + taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err)); + } + + err = tmq_consumer_close(pInfo->tmq); + if (err != 0) { + pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err)); + taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err)); + } + pInfo->tmq = NULL; + + return NULL; +} + + + +static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) { + TAOS_RES *res = taos_query(taos, command); + int32_t code = taos_errno(res); + + if (code != 0) { + pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC); + taos_free_result(res); + return -1; + } + + if (INSERT_TYPE == type) { + int affectedRows = taos_affected_rows(res); + taos_free_result(res); + return affectedRows; + } + + taos_free_result(res); + return 0; +} + +void* ombProduceThreadFunc(void* param) { + SThreadInfo* pInfo = (SThreadInfo*)param; + + pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (pInfo->taos == NULL) { + printf("taos_connect() fail\n"); + return NULL; + } + + int64_t affectedRowsTotal = 0; + int64_t sendMsgs = 0; + + uint32_t totalSendLoopTimes = g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms + uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize; + uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize; + if (remainder) { + batchPerTblTimes += 1; + } + + char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN); + if (NULL == sqlBuf) { + printf("malloc fail for sqlBuf\n"); + return NULL; + } + + printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes, batchPerTblTimes, pInfo->producerRate); + + char ctbName[64] = {0}; + sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId); + + int64_t lastPrintTime = taosGetTimestampUs(); + int64_t totalMsgLen = 0; + //int64_t timeStamp = taosGetTimestampUs(); + while (totalSendLoopTimes) { + int64_t startTs = taosGetTimestampUs(); + for (int i = 0; i < batchPerTblTimes; ++i) { + uint32_t msgsOfSql = g_stConfInfo.batchSize; + if ((i == batchPerTblTimes - 1) && (0 != remainder)) { + msgsOfSql = remainder; + } + int len = 0; + len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "insert into %s values ", ctbName); + for (int j = 0; j < msgsOfSql; j++) { + int64_t timeStamp = taosGetTimestampNs(); + len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload); + sendMsgs++; + pInfo->totalProduceMsgs++; + } + + totalMsgLen += len; + pInfo->totalMsgsLen += len; + + int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE); + if (affectedRows < 0) { + return NULL; + } + + affectedRowsTotal += affectedRows; + + //printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows); + } + totalSendLoopTimes -= 1; + + // calc spent time + int64_t currentTs = taosGetTimestampUs(); + int64_t delta = currentTs - startTs; + if (delta < SEND_TIME_UNIT * 1000) { + int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta); + //printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta); + taosUsleep((int32_t)sleepLen); + } + + currentTs = taosGetTimestampUs(); + delta = currentTs - lastPrintTime; + if (delta > 10 * 1000 * 1000) { + printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n", + pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes); + printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", + pInfo->consumerId, + sendMsgs * 1000.0 * 1000 / delta, + (totalMsgLen / 1024.0) / (delta / (1000*1000))); + lastPrintTime = currentTs; + sendMsgs = 0; + totalMsgLen = 0; + } + } + + printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal); + return NULL; +} + + +void printProduceInfo(int64_t start) { + int64_t totalMsgs = 0; + int64_t totalLenOfMsgs = 0; + for (int i = 0; i < g_stConfInfo.producers; i++) { + totalMsgs += g_stConfInfo.stProdThreads[i].totalProduceMsgs; + totalLenOfMsgs += g_stConfInfo.stProdThreads[i].totalMsgsLen; + } + + int64_t end = taosGetTimestampUs(); + + int64_t t = end - start; + if (0 == t) t = 1; + + double tInMs = (double)t / 1000000.0; + printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs); + + + printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", + tInMs, totalMsgs, g_stConfInfo.producers, + (double)totalMsgs / tInMs, + (double)totalLenOfMsgs/(1024.0*1024)/tInMs); + return; +} + + +void startOmbConsume() { + TdThreadAttr thattr; + taosThreadAttrInit(&thattr); + taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE); + + if (0 != g_stConfInfo.producers) { + TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0); + if (taos == NULL) { + taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n"); + ASSERT(0); + return ; + } + + char stbName[16] = "stb"; + char ctbPrefix[16] = "ctb"; + + char sql[256] = {0}; + sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName); + printf("SQL: %s\n", sql); + queryDbExec(taos, sql, NO_INSERT_TYPE); + + sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName, g_stConfInfo.producers); + printf("SQL: %s\n", sql); + queryDbExec(taos, sql, NO_INSERT_TYPE); + + sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName, stbName, g_stConfInfo.payloadLen); + printf("SQL: %s\n", sql); + queryDbExec(taos, sql, NO_INSERT_TYPE); + + for (int i = 0; i < g_stConfInfo.producers; i++) { + sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i, g_stConfInfo.dbName, i); + printf("SQL: %s\n", sql); + queryDbExec(taos, sql, NO_INSERT_TYPE); + } + + // create topic + sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName); + printf("SQL: %s\n", sql); + queryDbExec(taos, sql, NO_INSERT_TYPE); + + + int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers); + + printf("==== create %d produce thread ====\n", g_stConfInfo.producers); + for (int32_t i = 0; i < g_stConfInfo.producers; ++i) { + g_stConfInfo.stProdThreads[i].consumerId = i; + g_stConfInfo.stProdThreads[i].producerRate = producerRate; + taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc, + (void*)(&(g_stConfInfo.stProdThreads[i]))); + } + + if (0 == g_stConfInfo.numOfThread) { + int64_t start = taosGetTimestampUs(); + for (int32_t i = 0; i < g_stConfInfo.producers; i++) { + taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL); + taosThreadClear(&g_stConfInfo.stProdThreads[i].thread); + } + + printProduceInfo(start); + + taosFprintfFile(g_fp, "==== close tmqlog ====\n"); + taosCloseFile(&g_fp); + return; + } + + } + + // pthread_create one thread to consume + taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread); + for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) { + g_stConfInfo.stThreads[i].consumerId = i; + taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc, + (void*)(&(g_stConfInfo.stThreads[i]))); + } + + int64_t start = taosGetTimestampUs(); + + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL); + taosThreadClear(&g_stConfInfo.stThreads[i].thread); + } + + int64_t end = taosGetTimestampUs(); + + int64_t totalRows = 0; + int64_t totalMsgs = 0; + int64_t totalLenOfMsgs = 0; + for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { + totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt; + totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen; + totalRows += g_stConfInfo.stThreads[i].consumeRowCnt; + } + + int64_t t = end - start; + if (0 == t) t = 1; + + double tInMs = (double)t / 1000000.0; + taosFprintfFile(g_fp, + "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", + tInMs, totalMsgs, g_stConfInfo.numOfThread, + (double)(totalMsgs / tInMs), + (double)totalLenOfMsgs/(1024*1024)/tInMs); + + printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n", + tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, + (double)(totalMsgs / tInMs), + (double)totalLenOfMsgs/(1024*1024)/tInMs); + + taosFprintfFile(g_fp, "==== close tmqlog ====\n"); + taosCloseFile(&g_fp); + + return; +} + + int main(int32_t argc, char* argv[]) { parseArgument(argc, argv); + + if (0 != strlen(g_stConfInfo.topic)) { + startOmbConsume(); + return 0; + } + getConsumeInfo(); saveConfigToLogFile();