test: add test case
This commit is contained in:
parent
10dcf571ce
commit
6883aeb33c
|
@ -633,8 +633,9 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
int64_t lastTotalMsgs = 0;
|
||||||
uint64_t startTs = taosGetTimestampMs();
|
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||||
|
uint64_t startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
|
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
|
||||||
while (running) {
|
while (running) {
|
||||||
|
@ -648,19 +649,21 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
|
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
if (currentPrintTime - lastPrintTime > 10 * 1000) {
|
if (currentPrintTime - lastPrintTime > 10 * 1000) {
|
||||||
taosFprintfFile(g_fp, "consumer id %d has currently poll total msgs: %" PRId64 "\n", pInfo->consumerId,
|
taosFprintfFile(g_fp,
|
||||||
totalMsgs);
|
"consumer id %d has currently poll total msgs: %" PRId64 ", period rate: %.3f msgs/second\n",
|
||||||
lastPrintTime = currentPrintTime;
|
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0/(currentPrintTime - lastPrintTime));
|
||||||
}
|
lastPrintTime = currentPrintTime;
|
||||||
|
lastTotalMsgs = totalMsgs;
|
||||||
|
}
|
||||||
|
|
||||||
if (0 == once_flag) {
|
if (0 == once_flag) {
|
||||||
once_flag = 1;
|
once_flag = 1;
|
||||||
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
|
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalRows >= pInfo->expectMsgCnt) {
|
if ((totalRows >= pInfo->expectMsgCnt) || (totalMsgs >= pInfo->expectMsgCnt)) {
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
||||||
break;
|
break;
|
||||||
|
@ -884,8 +887,8 @@ int main(int32_t argc, char* argv[]) {
|
||||||
|
|
||||||
double tInMs = (double)t / 1000000.0;
|
double tInMs = (double)t / 1000000.0;
|
||||||
taosFprintfFile(g_fp,
|
taosFprintfFile(g_fp,
|
||||||
"Spent %.4f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.2f msgs/second\n\n",
|
"Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/second\n\n",
|
||||||
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
|
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs));
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
||||||
taosCloseFile(&g_fp);
|
taosCloseFile(&g_fp);
|
||||||
|
|
Loading…
Reference in New Issue