test: modify tmqSim for get timestamp
This commit is contained in:
parent
bca2428cdf
commit
6158a12a44
|
@ -36,7 +36,6 @@
|
||||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||||
#define MAX_VGROUP_CNT (32)
|
#define MAX_VGROUP_CNT (32)
|
||||||
|
|
||||||
int64_t now;
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
NOTIFY_CMD_START_CONSUM,
|
NOTIFY_CMD_START_CONSUM,
|
||||||
NOTIFY_CMD_START_COMMIT,
|
NOTIFY_CMD_START_COMMIT,
|
||||||
|
@ -91,6 +90,7 @@ typedef struct {
|
||||||
int32_t consumeDelay; // unit s
|
int32_t consumeDelay; // unit s
|
||||||
int32_t numOfThread;
|
int32_t numOfThread;
|
||||||
int32_t useSnapshot;
|
int32_t useSnapshot;
|
||||||
|
int64_t nowTime;
|
||||||
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
||||||
} SConfInfo;
|
} SConfInfo;
|
||||||
|
|
||||||
|
@ -199,6 +199,8 @@ void parseArgument(int32_t argc, char* argv[]) {
|
||||||
g_stConfInfo.saveRowFlag = 0;
|
g_stConfInfo.saveRowFlag = 0;
|
||||||
g_stConfInfo.consumeDelay = 5;
|
g_stConfInfo.consumeDelay = 5;
|
||||||
|
|
||||||
|
g_stConfInfo.nowTime = taosGetTimestampMs();
|
||||||
|
|
||||||
for (int32_t i = 1; i < argc; i++) {
|
for (int32_t i = 1; i < argc; i++) {
|
||||||
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
if (strcmp(argv[i], "-h") == 0 || strcmp(argv[i], "--help") == 0) {
|
||||||
printHelp();
|
printHelp();
|
||||||
|
@ -511,10 +513,8 @@ static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {}
|
||||||
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
char sqlStr[1024] = {0};
|
char sqlStr[1024] = {0};
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
|
||||||
|
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, now, cmdId,
|
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId,
|
||||||
pInfo->consumerId);
|
pInfo->consumerId);
|
||||||
|
|
||||||
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
|
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
|
||||||
|
@ -591,7 +591,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
char sqlStr[1024] = {0};
|
char sqlStr[1024] = {0};
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
|
sprintf(sqlStr, "insert into %s.consumeresult values (%" PRId64 ", %d, %" PRId64 ", %" PRId64 ", %d)",
|
||||||
g_stConfInfo.cdbName, atomic_fetch_add_64(&now, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
|
g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), pInfo->consumerId, pInfo->consumeMsgCnt,
|
||||||
pInfo->consumeRowCnt, pInfo->checkresult);
|
pInfo->consumeRowCnt, pInfo->checkresult);
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
|
@ -855,8 +855,6 @@ int32_t getConsumeInfo() {
|
||||||
}
|
}
|
||||||
|
|
||||||
int main(int32_t argc, char* argv[]) {
|
int main(int32_t argc, char* argv[]) {
|
||||||
now = taosGetTimestampMs();
|
|
||||||
|
|
||||||
parseArgument(argc, argv);
|
parseArgument(argc, argv);
|
||||||
getConsumeInfo();
|
getConsumeInfo();
|
||||||
saveConfigToLogFile();
|
saveConfigToLogFile();
|
||||||
|
|
Loading…
Reference in New Issue