[add check tmq consume result]

This commit is contained in:
plum-lihui 2022-03-24 10:13:32 +08:00
parent af560c848f
commit c1f25f3ac7
3 changed files with 34 additions and 13 deletions

View File

@ -244,8 +244,6 @@ endi
# return -1
#endi
return
#===================================================================
#===================================================================

View File

@ -47,9 +47,12 @@ sql drop database useless_db
# -m startTimestamp, default is 1640966400000 [2022-01-01 00:00:00]
# -g showMsgFlag, default is 0
#
#system_content ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg
system ../../debug/tests/test/c/tmq_demo -c ../../sim/tsim/cfg
print result-> $system_content
print cmd===> system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal
system_content ../../debug/tests/test/c/tmq_demo -sim 1 -b 100 -c ../../sim/tsim/cfg -w ../../sim/dnode1/data/vnode/vnode4/wal
print cmd result----> $system_content
if $system_content != @{consume success: 100}@ then
print not match in pos000
endi
sql show databases
print ===> $rows $data00 $data01 $data02 $data03
@ -78,4 +81,5 @@ endi
if $data00 != 10000 then
return -1
endi
#system sh/exec.sh -n dnode1 -s stop -x SIGINT

View File

@ -58,6 +58,7 @@ typedef struct {
int32_t totalRowsOfPerTbl;
int64_t startTimestamp;
int32_t showMsgFlag;
int32_t simCase;
int32_t totalRowsOfT2;
} SConfInfo;
@ -66,7 +67,7 @@ static SConfInfo g_stConfInfo = {
"tmqdb",
"stb",
"./tmqResult.txt", // output_file
"/data2/dnode/data/vnodes/vnode2/wal",
"/data2/dnode/data/vnode/vnode2/wal",
1, // threads
1, // tables
1, // vgroups
@ -77,6 +78,7 @@ static SConfInfo g_stConfInfo = {
10000, // total rows for per table
0, // 2020-01-01 00:00:00.000
0, // show consume msg switch
0, // if run in sim case
10000,
};
@ -117,6 +119,8 @@ static void printHelp() {
printf("%s%s%s%" PRId64 "\n", indent, indent, "startTimestamp, default is ", g_stConfInfo.startTimestamp);
printf("%s%s\n", indent, "-g");
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
printf("%s%s\n", indent, "-sim");
printf("%s%s%s%d\n", indent, indent, "simCase, default is ", g_stConfInfo.simCase);
exit(EXIT_SUCCESS);
}
@ -160,14 +164,17 @@ void parseArgument(int32_t argc, char *argv[]) {
g_stConfInfo.startTimestamp = atol(argv[++i]);
} else if (strcmp(argv[i], "-g") == 0) {
g_stConfInfo.showMsgFlag = atol(argv[++i]);
} else if (strcmp(argv[i], "-sim") == 0) {
g_stConfInfo.simCase = atol(argv[++i]);
} else {
pPrint("%s unknow para: %s %s", GREEN, argv[++i], NC);
printf("%s unknow para: %s %s", GREEN, argv[++i], NC);
exit(-1);
}
}
g_stConfInfo.totalRowsOfT2 = g_stConfInfo.totalRowsOfPerTbl * g_stConfInfo.ratio;
#if 0
pPrint("%s configDir:%s %s", GREEN, configDir, NC);
pPrint("%s dbName:%s %s", GREEN, g_stConfInfo.dbName, NC);
pPrint("%s stbName:%s %s", GREEN, g_stConfInfo.stbName, NC);
@ -184,6 +191,7 @@ void parseArgument(int32_t argc, char *argv[]) {
pPrint("%s totalRowsOfT2:%d %s", GREEN, g_stConfInfo.totalRowsOfT2, NC);
pPrint("%s startTimestamp:%" PRId64" %s", GREEN, g_stConfInfo.startTimestamp, NC);
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
#endif
}
static int running = 1;
@ -429,15 +437,21 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics, int32_t totalMsgs, int64_t walLog
double consumeTime = (double)(endTime - startTime) / 1000000;
if (batchCnt != totalMsgs) {
pPrint("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
printf("%s inserted msgs: %d and consume msgs: %d mismatch %s", GREEN, totalMsgs, batchCnt, NC);
exit(-1);
}
pPrint("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime);
if (0 == g_stConfInfo.simCase) {
printf("consume result: msgs: %d, skip log cnt: %d, time used:%.3f second\n", batchCnt, skipLogNum, consumeTime);
} else {
printf("{consume success: %d}", totalMsgs);
}
taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.2f| %10.2f |\n", batchCnt, consumeTime, (double)batchCnt / consumeTime, (double)walLogSize / (1024 * 1024.0) / consumeTime, (double)walLogSize / 1024.0 / batchCnt);
err = tmq_consumer_close(tmq);
if (err) {
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err));
exit(-1);
}
}
@ -679,12 +693,17 @@ int main(int32_t argc, char *argv[]) {
walLogSize = getDirectorySize(g_stConfInfo.vnodeWalPath);
if (walLogSize <= 0) {
pError("vnode2/wal size incorrect!");
printf("vnode2/wal size incorrect!");
exit(-1);
} else {
if (0 == g_stConfInfo.simCase) {
pPrint(".log file size in vnode2/wal: %.3f MBytes\n", (double)walLogSize/(1024 * 1024.0));
}
}
if (0 == g_stConfInfo.simCase) {
pPrint("insert result: %d rows, %d msgs, time:%.3f sec, speed:%.1f rows/second, %.1f msgs/second\n", totalRows, totalMsgs, seconds, rowsSpeed, msgsSpeed);
}
taosFprintfFile(g_fp, "|%10d | %10.3f | %8.2f | %10.3f ", totalMsgs, seconds, msgsSpeed, (double)walLogSize/(1024 * 1024.0));
}