Merge pull request #12714 from taosdata/test3.0/lihui
test: modify tmq_sim processer
This commit is contained in:
commit
9668fed8b6
|
@ -98,16 +98,28 @@ static void printHelp() {
|
||||||
exit(EXIT_SUCCESS);
|
exit(EXIT_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
void initLogFile() {
|
char* getCurrentTimeString(char* timeString) {
|
||||||
time_t now;
|
time_t tTime = taosGetTimestampSec();
|
||||||
struct tm curTime;
|
struct tm tm = *taosLocalTime(&tTime, NULL);
|
||||||
char filename[256];
|
sprintf(timeString, "%d-%02d-%02d %02d:%02d:%02d",
|
||||||
|
tm.tm_year + 1900,
|
||||||
|
tm.tm_mon + 1,
|
||||||
|
tm.tm_mday,
|
||||||
|
tm.tm_hour,
|
||||||
|
tm.tm_min,
|
||||||
|
tm.tm_sec);
|
||||||
|
|
||||||
|
return timeString;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void initLogFile() {
|
||||||
|
char filename[256];
|
||||||
|
char tmpString[128];
|
||||||
|
|
||||||
|
sprintf(filename,"%s/../log/tmqlog_%s.txt", configDir, getCurrentTimeString(tmpString));
|
||||||
|
//sprintf(filename, "%s/../log/tmqlog.txt", configDir);
|
||||||
|
|
||||||
now = taosTime(NULL);
|
|
||||||
taosLocalTime(&now, &curTime);
|
|
||||||
sprintf(filename, "%s/../log/tmqlog_%04d-%02d-%02d %02d-%02d-%02d.txt", configDir, curTime.tm_year + 1900,
|
|
||||||
curTime.tm_mon + 1, curTime.tm_mday, curTime.tm_hour, curTime.tm_min, curTime.tm_sec);
|
|
||||||
// sprintf(filename, "%s/../log/tmqlog.txt", configDir);
|
|
||||||
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
TdFilePtr pFile = taosOpenFile(filename, TD_FILE_TEXT | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
if (NULL == pFile) {
|
if (NULL == pFile) {
|
||||||
fprintf(stderr, "Failed to open %s for save result\n", filename);
|
fprintf(stderr, "Failed to open %s for save result\n", filename);
|
||||||
|
@ -117,9 +129,6 @@ void initLogFile() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void saveConfigToLogFile() {
|
void saveConfigToLogFile() {
|
||||||
time_t tTime = taosGetTimestampSec();
|
|
||||||
struct tm tm = *taosLocalTime(&tTime, NULL);
|
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "###################################################################\n");
|
taosFprintfFile(g_fp, "###################################################################\n");
|
||||||
taosFprintfFile(g_fp, "# configDir: %s\n", configDir);
|
taosFprintfFile(g_fp, "# configDir: %s\n", configDir);
|
||||||
taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName);
|
taosFprintfFile(g_fp, "# dbName: %s\n", g_stConfInfo.dbName);
|
||||||
|
@ -144,10 +153,11 @@ void saveConfigToLogFile() {
|
||||||
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
|
taosFprintfFile(g_fp, "%s:%s, ", g_stConfInfo.stThreads[i].key[k], g_stConfInfo.stThreads[i].value[k]);
|
||||||
}
|
}
|
||||||
taosFprintfFile(g_fp, "\n");
|
taosFprintfFile(g_fp, "\n");
|
||||||
|
taosFprintfFile(g_fp, " expect rows: %d\n", g_stConfInfo.stThreads[i].expectMsgCnt);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "# Test time: %d-%02d-%02d %02d:%02d:%02d\n", tm.tm_year + 1900, tm.tm_mon + 1,
|
char tmpString[128];
|
||||||
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec);
|
taosFprintfFile(g_fp, "# Test time: %s\n", getCurrentTimeString(tmpString));
|
||||||
taosFprintfFile(g_fp, "###################################################################\n");
|
taosFprintfFile(g_fp, "###################################################################\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -316,10 +326,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
|
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
|
||||||
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
|
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
|
||||||
|
|
||||||
time_t tTime = taosGetTimestampSec();
|
char tmpString[128];
|
||||||
struct tm tm = *taosLocalTime(&tTime, NULL);
|
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
|
||||||
taosFprintfFile(g_fp, "# save result: %d-%02d-%02d %02d:%02d:%02d, sql: %s\n", tm.tm_year + 1900, tm.tm_mon + 1,
|
|
||||||
tm.tm_mday, tm.tm_hour, tm.tm_min, tm.tm_sec, sqlStr);
|
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
|
@ -339,6 +347,9 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
int64_t totalMsgs = 0;
|
int64_t totalMsgs = 0;
|
||||||
int64_t totalRows = 0;
|
int64_t totalRows = 0;
|
||||||
|
|
||||||
|
char tmpString[128];
|
||||||
|
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
|
||||||
|
|
||||||
while (running) {
|
while (running) {
|
||||||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
|
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
|
||||||
if (tmqMsg) {
|
if (tmqMsg) {
|
||||||
|
@ -351,11 +362,13 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
|
|
||||||
if (totalRows >= pInfo->expectMsgCnt) {
|
if (totalRows >= pInfo->expectMsgCnt) {
|
||||||
taosFprintfFile(g_fp, "==== totalRows >= pInfo->expectMsgCnt, so break\n");
|
char tmpString[128];
|
||||||
|
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
taosFprintfFile(g_fp, "==== delay over time, so break\n");
|
char tmpString[128];
|
||||||
|
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue