From 6883aeb33c5c040dfac86ed8282ce859f89c9b40 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Thu, 30 Jun 2022 21:25:16 +0800 Subject: [PATCH] test: add test case --- tests/test/c/tmqSim.c | 33 ++++++++++++++++++--------------- 1 file changed, 18 insertions(+), 15 deletions(-) diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 7d57ef0781..542c37b7c9 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -633,8 +633,9 @@ void loop_consume(SThreadInfo* pInfo) { } } - uint64_t lastPrintTime = taosGetTimestampMs(); - uint64_t startTs = taosGetTimestampMs(); + int64_t lastTotalMsgs = 0; + uint64_t lastPrintTime = taosGetTimestampMs(); + uint64_t startTs = taosGetTimestampMs(); int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000); while (running) { @@ -647,20 +648,22 @@ void loop_consume(SThreadInfo* pInfo) { taos_free_result(tmqMsg); totalMsgs++; - - int64_t currentPrintTime = taosGetTimestampMs(); - if (currentPrintTime - lastPrintTime > 10 * 1000) { - taosFprintfFile(g_fp, "consumer id %d has currently poll total msgs: %" PRId64 "\n", pInfo->consumerId, - totalMsgs); - lastPrintTime = currentPrintTime; - } - + + int64_t currentPrintTime = taosGetTimestampMs(); + if (currentPrintTime - lastPrintTime > 10 * 1000) { + taosFprintfFile(g_fp, + "consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n", + pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0/(currentPrintTime - lastPrintTime)); + lastPrintTime = currentPrintTime; + lastTotalMsgs = totalMsgs; + } + if (0 == once_flag) { once_flag = 1; notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM); } - if (totalRows >= pInfo->expectMsgCnt) { + if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) { char tmpString[128]; taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString)); break; @@ -671,7 +674,7 @@ void loop_consume(SThreadInfo* pInfo) { break; } } - + if (0 == running) { taosFprintfFile(g_fp, "receive stop signal and not continue consume\n"); } @@ -881,11 +884,11 @@ int main(int32_t argc, char* argv[]) { int64_t t = end - start; if (0 == t) t = 1; - + double tInMs = (double)t / 1000000.0; taosFprintfFile(g_fp, - "Spent %.4f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.2f msgs/second\n\n", - tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs)); + "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n", + tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs)); taosFprintfFile(g_fp, "==== close tmqlog ====\n"); taosCloseFile(&g_fp);