This commit is contained in:
plum-lihui 2022-03-29 10:01:50 +08:00
parent 2a012b6133
commit 52d33d3910
2 changed files with 5 additions and 31 deletions

View File

@ -67,7 +67,7 @@ static SConfInfo g_stConfInfo = {
"tmqdb", "tmqdb",
"stb", "stb",
"./tmqResult.txt", // output_file "./tmqResult.txt", // output_file
"/data2/dnode/data/vnode/vnode2/wal", "", // /data2/dnode/data/vnode/vnode2/wal",
1, // threads 1, // threads
1, // tables 1, // tables
1, // vgroups 1, // vgroups
@ -662,7 +662,7 @@ int main(int32_t argc, char *argv[]) {
float msgsSpeed = totalMsgs / seconds; float msgsSpeed = totalMsgs / seconds;
if (0 == g_stConfInfo.simCase) { if ((0 == g_stConfInfo.simCase) && (strlen(g_stConfInfo.vnodeWalPath))) {
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath); walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
if (walLogSize <= 0) { if (walLogSize <= 0) {
printf("%s size incorrect!", g_stConfInfo.vnodeWalPath); printf("%s size incorrect!", g_stConfInfo.vnodeWalPath);

View File

@ -98,7 +98,7 @@ void parseArgument(int32_t argc, char *argv[]) {
} }
} }
#if 1 #if 0
pPrint("%s configDir:%s %s", GREEN, configDir, NC); pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC); pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC); pPrint("%s topicString:%s %s", GREEN, g_stConfInfo.topicString, NC);
@ -198,6 +198,8 @@ tmq_t* build_consumer() {
TAOS_RES* pRes = taos_query(pConn, sqlStr); TAOS_RES* pRes = taos_query(pConn, sqlStr);
if (taos_errno(pRes) != 0) { if (taos_errno(pRes) != 0) {
printf("error in use db, reason:%s\n", taos_errstr(pRes)); printf("error in use db, reason:%s\n", taos_errstr(pRes));
taos_free_result(pRes);
exit(-1);
} }
taos_free_result(pRes); taos_free_result(pRes);
@ -219,34 +221,6 @@ tmq_list_t* build_topic_list() {
return topic_list; return topic_list;
} }
void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
static const int MIN_COMMIT_COUNT = 1000;
int msg_count = 0;
tmq_resp_err_t err;
if ((err = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err));
return;
}
while (running) {
tmq_message_t* tmqmessage = tmq_consumer_poll(tmq, 1);
if (tmqmessage) {
msg_process(tmqmessage);
tmq_message_destroy(tmqmessage);
if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);
}
}
err = tmq_consumer_close(tmq);
if (err)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
else
fprintf(stderr, "%% Consumer closed\n");
}
void perf_loop(tmq_t* tmq, tmq_list_t* topics) { void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
tmq_resp_err_t err; tmq_resp_err_t err;