test: extent tmqSim
This commit is contained in:
parent
4e18941000
commit
b3fe36a0be
|
@ -20,6 +20,7 @@
|
|||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
#include <time.h>
|
||||
#include <math.h>
|
||||
|
||||
#include "taos.h"
|
||||
#include "taosdef.h"
|
||||
|
@ -35,6 +36,8 @@
|
|||
#define MAX_ROW_STR_LEN (16 * 1024)
|
||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||
#define MAX_VGROUP_CNT (32)
|
||||
#define SEND_TIME_UNIT 10 // ms
|
||||
#define MAX_SQL_LEN 1048576
|
||||
|
||||
typedef enum {
|
||||
NOTIFY_CMD_START_CONSUM,
|
||||
|
@ -42,6 +45,12 @@ typedef enum {
|
|||
NOTIFY_CMD_ID_BUTT,
|
||||
} NOTIFY_CMD_ID;
|
||||
|
||||
typedef enum enumQUERY_TYPE {
|
||||
NO_INSERT_TYPE,
|
||||
INSERT_TYPE,
|
||||
QUERY_TYPE_BUT
|
||||
} QUERY_TYPE;
|
||||
|
||||
typedef struct {
|
||||
TdThread thread;
|
||||
int32_t consumerId;
|
||||
|
@ -58,6 +67,7 @@ typedef struct {
|
|||
|
||||
int64_t consumeMsgCnt;
|
||||
int64_t consumeRowCnt;
|
||||
int64_t consumeLen;
|
||||
int32_t checkresult;
|
||||
|
||||
char topicString[1024];
|
||||
|
@ -77,14 +87,19 @@ typedef struct {
|
|||
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
||||
int64_t ts;
|
||||
|
||||
TAOS* taos;
|
||||
TAOS* taos;
|
||||
|
||||
// below parameters is used by omb test
|
||||
int32_t producerRate; // unit: msgs/s
|
||||
int64_t totalProduceMsgs;
|
||||
int64_t totalMsgsLen;
|
||||
|
||||
} SThreadInfo;
|
||||
|
||||
typedef struct {
|
||||
// input from argvs
|
||||
char cdbName[32];
|
||||
char dbName[32];
|
||||
char dbName[64];
|
||||
int32_t showMsgFlag;
|
||||
int32_t showRowFlag;
|
||||
int32_t saveRowFlag;
|
||||
|
@ -93,11 +108,22 @@ typedef struct {
|
|||
int32_t useSnapshot;
|
||||
int64_t nowTime;
|
||||
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
||||
|
||||
SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT];
|
||||
|
||||
// below parameters is used by omb test
|
||||
char topic[64];
|
||||
int32_t producers;
|
||||
int32_t producerRate;
|
||||
int32_t runDurationMinutes;
|
||||
int32_t batchSize;
|
||||
int32_t payloadLen;
|
||||
} SConfInfo;
|
||||
|
||||
static SConfInfo g_stConfInfo;
|
||||
TdFilePtr g_fp = NULL;
|
||||
static int running = 1;
|
||||
char* g_payload = NULL;
|
||||
|
||||
// char* g_pRowValue = NULL;
|
||||
// TdFilePtr g_fp = NULL;
|
||||
|
@ -117,7 +143,29 @@ static void printHelp() {
|
|||
printf("%s%s\n", indent, "-s");
|
||||
printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
|
||||
printf("%s%s\n", indent, "-y");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
|
||||
printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay);
|
||||
printf("%s%s\n", indent, "-e");
|
||||
printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot);
|
||||
|
||||
printf("%s%s\n", indent, "-t");
|
||||
printf("%s%s%s\n", indent, indent, "topic name, default is null");
|
||||
|
||||
printf("%s%s\n", indent, "-x");
|
||||
printf("%s%s%s\n", indent, indent, "consume thread number, default is 1");
|
||||
|
||||
|
||||
printf("%s%s\n", indent, "-l");
|
||||
printf("%s%s%s\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
|
||||
printf("%s%s\n", indent, "-p");
|
||||
printf("%s%s%s\n", indent, indent, "producer thread number, default is 0");
|
||||
printf("%s%s\n", indent, "-b");
|
||||
printf("%s%s%s\n", indent, indent, "batch size, default is 1");
|
||||
printf("%s%s\n", indent, "-i");
|
||||
printf("%s%s%s\n", indent, indent, "produce rate unit is msgs /s, default is 100000");
|
||||
printf("%s%s\n", indent, "-n");
|
||||
printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000");
|
||||
|
||||
|
||||
exit(EXIT_SUCCESS);
|
||||
}
|
||||
|
||||
|
@ -144,7 +192,11 @@ void initLogFile() {
|
|||
|
||||
pid_t process_id = getpid();
|
||||
|
||||
sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
|
||||
if (0 != strlen(g_stConfInfo.topic)) {
|
||||
sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString));
|
||||
} else {
|
||||
sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
|
||||
}
|
||||
#ifdef WINDOWS
|
||||
for (int i = 2; i < sizeof(filename); i++) {
|
||||
if (filename[i] == ':') filename[i] = '-';
|
||||
|
@ -199,6 +251,9 @@ void parseArgument(int32_t argc, char* argv[]) {
|
|||
g_stConfInfo.showRowFlag = 0;
|
||||
g_stConfInfo.saveRowFlag = 0;
|
||||
g_stConfInfo.consumeDelay = 5;
|
||||
g_stConfInfo.numOfThread = 1;
|
||||
g_stConfInfo.batchSize = 1;
|
||||
g_stConfInfo.producers = 0;
|
||||
|
||||
g_stConfInfo.nowTime = taosGetTimestampMs();
|
||||
|
||||
|
@ -222,12 +277,38 @@ void parseArgument(int32_t argc, char* argv[]) {
|
|||
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-e") == 0) {
|
||||
g_stConfInfo.useSnapshot = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-t") == 0) {
|
||||
char tmpBuf[56];
|
||||
strcpy(tmpBuf, argv[++i]);
|
||||
sprintf(g_stConfInfo.topic, "`%s`", tmpBuf);
|
||||
} else if (strcmp(argv[i], "-x") == 0) {
|
||||
g_stConfInfo.numOfThread = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-l") == 0) {
|
||||
g_stConfInfo.runDurationMinutes = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-p") == 0) {
|
||||
g_stConfInfo.producers = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-b") == 0) {
|
||||
g_stConfInfo.batchSize = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-i") == 0) {
|
||||
g_stConfInfo.producerRate = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-n") == 0) {
|
||||
g_stConfInfo.payloadLen = atol(argv[++i]);
|
||||
} else {
|
||||
pError("%s unknow para: %s %s", GREEN, argv[++i], NC);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
g_payload = taosMemoryCalloc(g_stConfInfo.payloadLen + 1, 1);
|
||||
if (NULL == g_payload) {
|
||||
pPrint("%s failed to malloc for payload %s", GREEN, NC);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < g_stConfInfo.payloadLen; i++) {
|
||||
strcpy(&g_payload[i], "a");
|
||||
}
|
||||
|
||||
initLogFile();
|
||||
|
||||
taosFprintfFile(g_fp, "====parseArgument() success\n");
|
||||
|
@ -240,6 +321,11 @@ void parseArgument(int32_t argc, char* argv[]) {
|
|||
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
|
||||
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
|
||||
pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
|
||||
|
||||
pPrint("%s snapshot:%d %s", GREEN, g_stConfInfo.useSnapshot, NC);
|
||||
|
||||
pPrint("%s omb topic:%s %s", GREEN, g_stConfInfo.topic, NC);
|
||||
pPrint("%s numOfThread:%d %s", GREEN, g_stConfInfo.numOfThread, NC);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -909,8 +995,439 @@ int32_t getConsumeInfo() {
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) {
|
||||
char buf[16*1024];
|
||||
int32_t totalRows = 0;
|
||||
int32_t totalLen = 0;
|
||||
|
||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
//int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
//const char* dbName = tmq_get_db_name(msg);
|
||||
|
||||
//taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
|
||||
//taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
||||
// tmq_get_topic_name(msg), vgroupId);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
|
||||
if (row == NULL) break;
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
//int32_t* length = taos_fetch_lengths(msg);
|
||||
//int32_t precision = taos_result_precision(msg);
|
||||
//const char* tbName = tmq_get_table_name(msg);
|
||||
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
totalLen += strlen(buf);
|
||||
totalRows++;
|
||||
}
|
||||
|
||||
*lenOfRows = totalLen;
|
||||
return totalRows;
|
||||
}
|
||||
|
||||
void omb_loop_consume(SThreadInfo* pInfo) {
|
||||
int32_t code;
|
||||
|
||||
int32_t once_flag = 0;
|
||||
|
||||
int64_t totalMsgs = 0;
|
||||
int64_t totalRows = 0;
|
||||
|
||||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
|
||||
pInfo->consumerId);
|
||||
printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
|
||||
pInfo->consumerId);
|
||||
|
||||
pInfo->ts = taosGetTimestampMs();
|
||||
|
||||
int64_t lastTotalMsgs = 0;
|
||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||
uint64_t startTs = taosGetTimestampMs();
|
||||
|
||||
int64_t totalLenOfMsg = 0;
|
||||
int64_t lastTotalLenOfMsg = 0;
|
||||
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
|
||||
while (running) {
|
||||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
|
||||
if (tmqMsg) {
|
||||
int64_t lenOfMsg = 0;
|
||||
totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg);
|
||||
totalLenOfMsg += lenOfMsg;
|
||||
taos_free_result(tmqMsg);
|
||||
totalMsgs++;
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
if (currentPrintTime - lastPrintTime > 10 * 1000) {
|
||||
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
|
||||
int64_t deltaTime = currentPrintTime - lastPrintTime;
|
||||
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n",
|
||||
pInfo->consumerId, totalRows, totalMsgs,
|
||||
(totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
|
||||
currentLenOfMsg*1000.0/(1024*1024)/deltaTime);
|
||||
|
||||
taosFprintfFile(
|
||||
g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period cons rate: %.3f msgs/s, %.1f MB/s\n",
|
||||
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, currentLenOfMsg*1000.0/deltaTime);
|
||||
lastPrintTime = currentPrintTime;
|
||||
lastTotalMsgs = totalMsgs;
|
||||
lastTotalLenOfMsg = totalLenOfMsg;
|
||||
}
|
||||
} else {
|
||||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||
printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||
int64_t currentPrintTime = taosGetTimestampMs();
|
||||
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
|
||||
int64_t deltaTime = currentPrintTime - lastPrintTime;
|
||||
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n",
|
||||
pInfo->consumerId, totalRows, totalMsgs,
|
||||
(totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
|
||||
currentLenOfMsg*1000.0/(1024*1024)/deltaTime);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->consumeMsgCnt = totalMsgs;
|
||||
pInfo->consumeRowCnt = totalRows;
|
||||
pInfo->consumeLen = totalLenOfMsg;
|
||||
|
||||
}
|
||||
|
||||
|
||||
void* ombConsumeThreadFunc(void* param) {
|
||||
SThreadInfo* pInfo = (SThreadInfo*)param;
|
||||
|
||||
//################### set key ########################
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
// tmq_conf_set(conf, "td.connect.ip", "localhost");
|
||||
// tmq_conf_set(conf, "td.connect.port", "6030");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
|
||||
tmq_conf_set(conf, "group.id", "ombCgrp");
|
||||
// tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
// tmq_conf_set(conf, "client.id", "c-001");
|
||||
// tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
// tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
|
||||
// tmq_conf_set(conf, "auto.offset.reset", "none");
|
||||
// tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
//
|
||||
if (g_stConfInfo.useSnapshot) {
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
}
|
||||
|
||||
pInfo->tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
//################### set topic ##########################
|
||||
pInfo->topicList = tmq_list_new();
|
||||
tmq_list_append(pInfo->topicList, g_stConfInfo.topic);
|
||||
|
||||
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
||||
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
|
||||
assert(0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int32_t err = tmq_subscribe(pInfo->tmq, pInfo->topicList);
|
||||
if (err != 0) {
|
||||
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
|
||||
assert(0);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
tmq_list_destroy(pInfo->topicList);
|
||||
pInfo->topicList = NULL;
|
||||
|
||||
omb_loop_consume(pInfo);
|
||||
|
||||
err = tmq_unsubscribe(pInfo->tmq);
|
||||
if (err != 0) {
|
||||
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_unsubscribe()! reason: %s\n", tmq_err2str(err));
|
||||
}
|
||||
|
||||
err = tmq_consumer_close(pInfo->tmq);
|
||||
if (err != 0) {
|
||||
pError("tmq_consumer_close() fail, reason: %s\n", tmq_err2str(err));
|
||||
taosFprintfFile(g_fp, "tmq_consumer_close()! reason: %s\n", tmq_err2str(err));
|
||||
}
|
||||
pInfo->tmq = NULL;
|
||||
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
|
||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) {
|
||||
TAOS_RES *res = taos_query(taos, command);
|
||||
int32_t code = taos_errno(res);
|
||||
|
||||
if (code != 0) {
|
||||
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
|
||||
taos_free_result(res);
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (INSERT_TYPE == type) {
|
||||
int affectedRows = taos_affected_rows(res);
|
||||
taos_free_result(res);
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
taos_free_result(res);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void* ombProduceThreadFunc(void* param) {
|
||||
SThreadInfo* pInfo = (SThreadInfo*)param;
|
||||
|
||||
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (pInfo->taos == NULL) {
|
||||
printf("taos_connect() fail\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
int64_t affectedRowsTotal = 0;
|
||||
int64_t sendMsgs = 0;
|
||||
|
||||
uint32_t totalSendLoopTimes = g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms
|
||||
uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize;
|
||||
uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize;
|
||||
if (remainder) {
|
||||
batchPerTblTimes += 1;
|
||||
}
|
||||
|
||||
char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
|
||||
if (NULL == sqlBuf) {
|
||||
printf("malloc fail for sqlBuf\n");
|
||||
return NULL;
|
||||
}
|
||||
|
||||
printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes, batchPerTblTimes, pInfo->producerRate);
|
||||
|
||||
char ctbName[64] = {0};
|
||||
sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId);
|
||||
|
||||
int64_t lastPrintTime = taosGetTimestampUs();
|
||||
int64_t totalMsgLen = 0;
|
||||
//int64_t timeStamp = taosGetTimestampUs();
|
||||
while (totalSendLoopTimes) {
|
||||
int64_t startTs = taosGetTimestampUs();
|
||||
for (int i = 0; i < batchPerTblTimes; ++i) {
|
||||
uint32_t msgsOfSql = g_stConfInfo.batchSize;
|
||||
if ((i == batchPerTblTimes - 1) && (0 != remainder)) {
|
||||
msgsOfSql = remainder;
|
||||
}
|
||||
int len = 0;
|
||||
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "insert into %s values ", ctbName);
|
||||
for (int j = 0; j < msgsOfSql; j++) {
|
||||
int64_t timeStamp = taosGetTimestampNs();
|
||||
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
|
||||
sendMsgs++;
|
||||
pInfo->totalProduceMsgs++;
|
||||
}
|
||||
|
||||
totalMsgLen += len;
|
||||
pInfo->totalMsgsLen += len;
|
||||
|
||||
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
|
||||
if (affectedRows < 0) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
affectedRowsTotal += affectedRows;
|
||||
|
||||
//printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows);
|
||||
}
|
||||
totalSendLoopTimes -= 1;
|
||||
|
||||
// calc spent time
|
||||
int64_t currentTs = taosGetTimestampUs();
|
||||
int64_t delta = currentTs - startTs;
|
||||
if (delta < SEND_TIME_UNIT * 1000) {
|
||||
int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta);
|
||||
//printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta);
|
||||
taosUsleep((int32_t)sleepLen);
|
||||
}
|
||||
|
||||
currentTs = taosGetTimestampUs();
|
||||
delta = currentTs - lastPrintTime;
|
||||
if (delta > 10 * 1000 * 1000) {
|
||||
printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n",
|
||||
pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes);
|
||||
printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n",
|
||||
pInfo->consumerId,
|
||||
sendMsgs * 1000.0 * 1000 / delta,
|
||||
(totalMsgLen / 1024.0) / (delta / (1000*1000)));
|
||||
lastPrintTime = currentTs;
|
||||
sendMsgs = 0;
|
||||
totalMsgLen = 0;
|
||||
}
|
||||
}
|
||||
|
||||
printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal);
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
||||
void printProduceInfo(int64_t start) {
|
||||
int64_t totalMsgs = 0;
|
||||
int64_t totalLenOfMsgs = 0;
|
||||
for (int i = 0; i < g_stConfInfo.producers; i++) {
|
||||
totalMsgs += g_stConfInfo.stProdThreads[i].totalProduceMsgs;
|
||||
totalLenOfMsgs += g_stConfInfo.stProdThreads[i].totalMsgsLen;
|
||||
}
|
||||
|
||||
int64_t end = taosGetTimestampUs();
|
||||
|
||||
int64_t t = end - start;
|
||||
if (0 == t) t = 1;
|
||||
|
||||
double tInMs = (double)t / 1000000.0;
|
||||
printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs);
|
||||
|
||||
|
||||
printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
||||
tInMs, totalMsgs, g_stConfInfo.producers,
|
||||
(double)totalMsgs / tInMs,
|
||||
(double)totalLenOfMsgs/(1024.0*1024)/tInMs);
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
void startOmbConsume() {
|
||||
TdThreadAttr thattr;
|
||||
taosThreadAttrInit(&thattr);
|
||||
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||
|
||||
if (0 != g_stConfInfo.producers) {
|
||||
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
if (taos == NULL) {
|
||||
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
|
||||
ASSERT(0);
|
||||
return ;
|
||||
}
|
||||
|
||||
char stbName[16] = "stb";
|
||||
char ctbPrefix[16] = "ctb";
|
||||
|
||||
char sql[256] = {0};
|
||||
sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName);
|
||||
printf("SQL: %s\n", sql);
|
||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||
|
||||
sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName, g_stConfInfo.producers);
|
||||
printf("SQL: %s\n", sql);
|
||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||
|
||||
sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName, stbName, g_stConfInfo.payloadLen);
|
||||
printf("SQL: %s\n", sql);
|
||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||
|
||||
for (int i = 0; i < g_stConfInfo.producers; i++) {
|
||||
sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i, g_stConfInfo.dbName, i);
|
||||
printf("SQL: %s\n", sql);
|
||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||
}
|
||||
|
||||
// create topic
|
||||
sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName);
|
||||
printf("SQL: %s\n", sql);
|
||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||
|
||||
|
||||
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
|
||||
|
||||
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
|
||||
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
|
||||
g_stConfInfo.stProdThreads[i].consumerId = i;
|
||||
g_stConfInfo.stProdThreads[i].producerRate = producerRate;
|
||||
taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc,
|
||||
(void*)(&(g_stConfInfo.stProdThreads[i])));
|
||||
}
|
||||
|
||||
if (0 == g_stConfInfo.numOfThread) {
|
||||
int64_t start = taosGetTimestampUs();
|
||||
for (int32_t i = 0; i < g_stConfInfo.producers; i++) {
|
||||
taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL);
|
||||
taosThreadClear(&g_stConfInfo.stProdThreads[i].thread);
|
||||
}
|
||||
|
||||
printProduceInfo(start);
|
||||
|
||||
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
||||
taosCloseFile(&g_fp);
|
||||
return;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// pthread_create one thread to consume
|
||||
taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
|
||||
g_stConfInfo.stThreads[i].consumerId = i;
|
||||
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc,
|
||||
(void*)(&(g_stConfInfo.stThreads[i])));
|
||||
}
|
||||
|
||||
int64_t start = taosGetTimestampUs();
|
||||
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||
taosThreadJoin(g_stConfInfo.stThreads[i].thread, NULL);
|
||||
taosThreadClear(&g_stConfInfo.stThreads[i].thread);
|
||||
}
|
||||
|
||||
int64_t end = taosGetTimestampUs();
|
||||
|
||||
int64_t totalRows = 0;
|
||||
int64_t totalMsgs = 0;
|
||||
int64_t totalLenOfMsgs = 0;
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||
totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
|
||||
totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen;
|
||||
totalRows += g_stConfInfo.stThreads[i].consumeRowCnt;
|
||||
}
|
||||
|
||||
int64_t t = end - start;
|
||||
if (0 == t) t = 1;
|
||||
|
||||
double tInMs = (double)t / 1000000.0;
|
||||
taosFprintfFile(g_fp,
|
||||
"Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
||||
tInMs, totalMsgs, g_stConfInfo.numOfThread,
|
||||
(double)(totalMsgs / tInMs),
|
||||
(double)totalLenOfMsgs/(1024*1024)/tInMs);
|
||||
|
||||
printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
||||
tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread,
|
||||
(double)(totalMsgs / tInMs),
|
||||
(double)totalLenOfMsgs/(1024*1024)/tInMs);
|
||||
|
||||
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
||||
taosCloseFile(&g_fp);
|
||||
|
||||
return;
|
||||
}
|
||||
|
||||
|
||||
int main(int32_t argc, char* argv[]) {
|
||||
parseArgument(argc, argv);
|
||||
|
||||
if (0 != strlen(g_stConfInfo.topic)) {
|
||||
startOmbConsume();
|
||||
return 0;
|
||||
}
|
||||
|
||||
getConsumeInfo();
|
||||
saveConfigToLogFile();
|
||||
|
||||
|
|
Loading…
Reference in New Issue