Merge pull request #18489 from taosdata/fix/TD-20318
test: add asan case and fix tmqSim error
This commit is contained in:
commit
791f0bd276
|
@ -687,9 +687,9 @@
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqError.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqError.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/schema.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/schema.py
|
||||||
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilter.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbFilter.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqCheckData1.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqCheckData1.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
|
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqConsumerGroup.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqShow.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqShow.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqAlterSchema.py
|
||||||
,,,system-test,python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
,,,system-test,python3 ./test.py -f 7-tmq/tmqConsFromTsdb.py
|
||||||
|
|
|
@ -128,12 +128,12 @@ class TMQCom:
|
||||||
os.system(shellCmd)
|
os.system(shellCmd)
|
||||||
|
|
||||||
def stopTmqSimProcess(self, processorName):
|
def stopTmqSimProcess(self, processorName):
|
||||||
psCmd = "ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
|
psCmd = "unset LD_PRELOAD; ps -ef|grep -w %s|grep -v grep | awk '{print $2}'"%(processorName)
|
||||||
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
processID = subprocess.check_output(psCmd, shell=True).decode("utf-8")
|
||||||
onlyKillOnceWindows = 0
|
onlyKillOnceWindows = 0
|
||||||
while(processID):
|
while(processID):
|
||||||
if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
|
if not platform.system().lower() == 'windows' or (onlyKillOnceWindows == 0 and platform.system().lower() == 'windows'):
|
||||||
killCmd = "kill -INT %s > /dev/null 2>&1" % processID
|
killCmd = "unset LD_PRELOAD; kill -INT %s > /dev/null 2>&1" % processID
|
||||||
os.system(killCmd)
|
os.system(killCmd)
|
||||||
onlyKillOnceWindows = 1
|
onlyKillOnceWindows = 1
|
||||||
time.sleep(0.2)
|
time.sleep(0.2)
|
||||||
|
|
|
@ -14,13 +14,13 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <assert.h>
|
#include <assert.h>
|
||||||
|
#include <math.h>
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
#include <sys/stat.h>
|
#include <sys/stat.h>
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <time.h>
|
#include <time.h>
|
||||||
#include <math.h>
|
|
||||||
|
|
||||||
#include "taos.h"
|
#include "taos.h"
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
@ -36,7 +36,7 @@
|
||||||
#define MAX_ROW_STR_LEN (16 * 1024)
|
#define MAX_ROW_STR_LEN (16 * 1024)
|
||||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||||
#define MAX_VGROUP_CNT (32)
|
#define MAX_VGROUP_CNT (32)
|
||||||
#define SEND_TIME_UNIT 10 // ms
|
#define SEND_TIME_UNIT 10 // ms
|
||||||
#define MAX_SQL_LEN 1048576
|
#define MAX_SQL_LEN 1048576
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
|
@ -45,11 +45,7 @@ typedef enum {
|
||||||
NOTIFY_CMD_ID_BUTT,
|
NOTIFY_CMD_ID_BUTT,
|
||||||
} NOTIFY_CMD_ID;
|
} NOTIFY_CMD_ID;
|
||||||
|
|
||||||
typedef enum enumQUERY_TYPE {
|
typedef enum enumQUERY_TYPE { NO_INSERT_TYPE, INSERT_TYPE, QUERY_TYPE_BUT } QUERY_TYPE;
|
||||||
NO_INSERT_TYPE,
|
|
||||||
INSERT_TYPE,
|
|
||||||
QUERY_TYPE_BUT
|
|
||||||
} QUERY_TYPE;
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
|
@ -61,7 +57,7 @@ typedef struct {
|
||||||
// char autoOffsetRest[16]; // none, earliest, latest
|
// char autoOffsetRest[16]; // none, earliest, latest
|
||||||
|
|
||||||
TdFilePtr pConsumeRowsFile;
|
TdFilePtr pConsumeRowsFile;
|
||||||
TdFilePtr pConsumeMetaFile;
|
TdFilePtr pConsumeMetaFile;
|
||||||
int32_t ifCheckData;
|
int32_t ifCheckData;
|
||||||
int64_t expectMsgCnt;
|
int64_t expectMsgCnt;
|
||||||
|
|
||||||
|
@ -87,12 +83,12 @@ typedef struct {
|
||||||
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
||||||
int64_t ts;
|
int64_t ts;
|
||||||
|
|
||||||
TAOS* taos;
|
TAOS* taos;
|
||||||
|
|
||||||
// below parameters is used by omb test
|
// below parameters is used by omb test
|
||||||
int32_t producerRate; // unit: msgs/s
|
int32_t producerRate; // unit: msgs/s
|
||||||
int64_t totalProduceMsgs;
|
int64_t totalProduceMsgs;
|
||||||
int64_t totalMsgsLen;
|
int64_t totalMsgsLen;
|
||||||
|
|
||||||
} SThreadInfo;
|
} SThreadInfo;
|
||||||
|
|
||||||
|
@ -112,12 +108,12 @@ typedef struct {
|
||||||
SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT];
|
SThreadInfo stProdThreads[MAX_CONSUMER_THREAD_CNT];
|
||||||
|
|
||||||
// below parameters is used by omb test
|
// below parameters is used by omb test
|
||||||
char topic[64];
|
char topic[64];
|
||||||
int32_t producers;
|
int32_t producers;
|
||||||
int32_t producerRate;
|
int32_t producerRate;
|
||||||
int32_t runDurationMinutes;
|
int32_t runDurationMinutes;
|
||||||
int32_t batchSize;
|
int32_t batchSize;
|
||||||
int32_t payloadLen;
|
int32_t payloadLen;
|
||||||
} SConfInfo;
|
} SConfInfo;
|
||||||
|
|
||||||
static SConfInfo g_stConfInfo;
|
static SConfInfo g_stConfInfo;
|
||||||
|
@ -146,14 +142,13 @@ static void printHelp() {
|
||||||
printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay);
|
printf("%s%s%s%ds\n", indent, indent, "consume delay, default is ", g_stConfInfo.consumeDelay);
|
||||||
printf("%s%s\n", indent, "-e");
|
printf("%s%s\n", indent, "-e");
|
||||||
printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot);
|
printf("%s%s%s%d\n", indent, indent, "snapshot, default is ", g_stConfInfo.useSnapshot);
|
||||||
|
|
||||||
printf("%s%s\n", indent, "-t");
|
printf("%s%s\n", indent, "-t");
|
||||||
printf("%s%s%s\n", indent, indent, "topic name, default is null");
|
printf("%s%s%s\n", indent, indent, "topic name, default is null");
|
||||||
|
|
||||||
printf("%s%s\n", indent, "-x");
|
printf("%s%s\n", indent, "-x");
|
||||||
printf("%s%s%s\n", indent, indent, "consume thread number, default is 1");
|
printf("%s%s%s\n", indent, indent, "consume thread number, default is 1");
|
||||||
|
|
||||||
|
|
||||||
printf("%s%s\n", indent, "-l");
|
printf("%s%s\n", indent, "-l");
|
||||||
printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
|
printf("%s%s%s%d\n", indent, indent, "run duration unit is minutes, default is ", g_stConfInfo.runDurationMinutes);
|
||||||
printf("%s%s\n", indent, "-p");
|
printf("%s%s\n", indent, "-p");
|
||||||
|
@ -165,7 +160,6 @@ static void printHelp() {
|
||||||
printf("%s%s\n", indent, "-n");
|
printf("%s%s\n", indent, "-n");
|
||||||
printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000");
|
printf("%s%s%s\n", indent, indent, "payload len unit is byte, default is 1000");
|
||||||
|
|
||||||
|
|
||||||
exit(EXIT_SUCCESS);
|
exit(EXIT_SUCCESS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -194,7 +188,7 @@ void initLogFile() {
|
||||||
pid_t process_id = getpid();
|
pid_t process_id = getpid();
|
||||||
|
|
||||||
if (0 != strlen(g_stConfInfo.topic)) {
|
if (0 != strlen(g_stConfInfo.topic)) {
|
||||||
sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString));
|
sprintf(filename, "/tmp/tmqlog-%d-%s.txt", process_id, getCurrentTimeString(tmpString));
|
||||||
} else {
|
} else {
|
||||||
sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
|
sprintf(filename, "%s/../log/tmqlog-%d-%s.txt", configDir, process_id, getCurrentTimeString(tmpString));
|
||||||
}
|
}
|
||||||
|
@ -294,7 +288,7 @@ void parseArgument(int32_t argc, char* argv[]) {
|
||||||
g_stConfInfo.producerRate = atol(argv[++i]);
|
g_stConfInfo.producerRate = atol(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-n") == 0) {
|
} else if (strcmp(argv[i], "-n") == 0) {
|
||||||
g_stConfInfo.payloadLen = atol(argv[++i]);
|
g_stConfInfo.payloadLen = atol(argv[++i]);
|
||||||
if(g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024){
|
if (g_stConfInfo.payloadLen <= 0 || g_stConfInfo.payloadLen > 1024 * 1024 * 1024) {
|
||||||
pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC);
|
pError("%s calloc size is too large: %s %s", GREEN, argv[++i], NC);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
@ -357,9 +351,9 @@ void ltrim(char* str) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int queryDB(TAOS* taos, char* command) {
|
int queryDB(TAOS* taos, char* command) {
|
||||||
int retryCnt = 10;
|
int retryCnt = 10;
|
||||||
int code = 0;
|
int code = 0;
|
||||||
TAOS_RES* pRes = NULL;
|
TAOS_RES* pRes = NULL;
|
||||||
|
|
||||||
while (retryCnt--) {
|
while (retryCnt--) {
|
||||||
pRes = taos_query(taos, command);
|
pRes = taos_query(taos, command);
|
||||||
|
@ -379,7 +373,6 @@ int queryDB(TAOS* taos, char* command) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
||||||
int32_t i;
|
int32_t i;
|
||||||
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
||||||
|
@ -403,22 +396,21 @@ void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TAOS* createNewTaosConnect() {
|
TAOS* createNewTaosConnect() {
|
||||||
TAOS* taos = NULL;
|
TAOS* taos = NULL;
|
||||||
int32_t retryCnt = 10;
|
int32_t retryCnt = 10;
|
||||||
|
|
||||||
while (retryCnt--) {
|
while (retryCnt--) {
|
||||||
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
TAOS* taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||||
if (NULL != taos) {
|
if (NULL != taos) {
|
||||||
return taos;
|
return taos;
|
||||||
}
|
}
|
||||||
taosSsleep(1);
|
taosSsleep(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "taos_connect() fail\n");
|
taosFprintfFile(g_fp, "taos_connect() fail\n");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
||||||
char sqlStr[1100] = {0};
|
char sqlStr[1100] = {0};
|
||||||
|
|
||||||
|
@ -440,7 +432,7 @@ int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
||||||
if (retCode != 0) {
|
if (retCode != 0) {
|
||||||
taosFprintfFile(g_fp, "error in save consume content\n");
|
taosFprintfFile(g_fp, "error in save consume content\n");
|
||||||
taosCloseFile(&g_fp);
|
taosCloseFile(&g_fp);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
exit(-1);
|
exit(-1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -481,7 +473,7 @@ static char* shellFormatTimestamp(char* buf, int64_t val, int32_t precision) {
|
||||||
|
|
||||||
struct tm ptm;
|
struct tm ptm;
|
||||||
taosLocalTime(&tt, &ptm);
|
taosLocalTime(&tt, &ptm);
|
||||||
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
|
size_t pos = strftime(buf, 35, "%Y-%m-%d %H:%M:%S", &ptm);
|
||||||
|
|
||||||
if (precision == TSDB_TIME_PRECISION_NANO) {
|
if (precision == TSDB_TIME_PRECISION_NANO) {
|
||||||
sprintf(buf + pos, ".%09d", ms);
|
sprintf(buf + pos, ".%09d", ms);
|
||||||
|
@ -548,22 +540,20 @@ static void shellDumpFieldToFile(TdFilePtr pFile, const char* val, TAOS_FIELD* f
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
case TSDB_DATA_TYPE_JSON:
|
case TSDB_DATA_TYPE_JSON: {
|
||||||
{
|
int32_t bufIndex = 0;
|
||||||
int32_t bufIndex = 0;
|
for (int32_t i = 0; i < length; i++) {
|
||||||
for (int32_t i = 0; i < length; i++) {
|
buf[bufIndex] = val[i];
|
||||||
|
bufIndex++;
|
||||||
|
if (val[i] == '\"') {
|
||||||
buf[bufIndex] = val[i];
|
buf[bufIndex] = val[i];
|
||||||
bufIndex++;
|
bufIndex++;
|
||||||
if (val[i] == '\"') {
|
|
||||||
buf[bufIndex] = val[i];
|
|
||||||
bufIndex++;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
buf[bufIndex] = 0;
|
|
||||||
|
|
||||||
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
|
||||||
}
|
}
|
||||||
break;
|
buf[bufIndex] = 0;
|
||||||
|
|
||||||
|
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
||||||
|
} break;
|
||||||
case TSDB_DATA_TYPE_TIMESTAMP:
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
shellFormatTimestamp(buf, *(int64_t*)val, precision);
|
shellFormatTimestamp(buf, *(int64_t*)val, precision);
|
||||||
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
taosFprintfFile(pFile, "%s%s%s", quotationStr, buf, quotationStr);
|
||||||
|
@ -635,7 +625,6 @@ static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
return totalRows;
|
return totalRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
|
@ -650,24 +639,24 @@ static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIn
|
||||||
|
|
||||||
{
|
{
|
||||||
tmq_raw_data raw = {0};
|
tmq_raw_data raw = {0};
|
||||||
int32_t code = tmq_get_raw(msg, &raw);
|
int32_t code = tmq_get_raw(msg, &raw);
|
||||||
|
|
||||||
if(code == TSDB_CODE_SUCCESS){
|
if (code == TSDB_CODE_SUCCESS) {
|
||||||
// int retCode = queryDB(pInfo->taos, "use metadb");
|
// int retCode = queryDB(pInfo->taos, "use metadb");
|
||||||
// if (retCode != 0) {
|
// if (retCode != 0) {
|
||||||
// taosFprintfFile(g_fp, "error when use metadb\n");
|
// taosFprintfFile(g_fp, "error when use metadb\n");
|
||||||
// taosCloseFile(&g_fp);
|
// taosCloseFile(&g_fp);
|
||||||
// exit(-1);
|
// exit(-1);
|
||||||
// }
|
// }
|
||||||
// taosFprintfFile(g_fp, "raw:%p\n", &raw);
|
// taosFprintfFile(g_fp, "raw:%p\n", &raw);
|
||||||
//
|
//
|
||||||
// tmq_write_raw(pInfo->taos, raw);
|
// tmq_write_raw(pInfo->taos, raw);
|
||||||
}
|
}
|
||||||
|
|
||||||
char* result = tmq_get_json_meta(msg);
|
char* result = tmq_get_json_meta(msg);
|
||||||
if(result && strcmp(result, "") != 0){
|
if (result && strcmp(result, "") != 0) {
|
||||||
//printf("meta result: %s\n", result);
|
// printf("meta result: %s\n", result);
|
||||||
taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
|
taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
|
||||||
}
|
}
|
||||||
tmq_free_json_meta(result);
|
tmq_free_json_meta(result);
|
||||||
}
|
}
|
||||||
|
@ -683,8 +672,8 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
char sqlStr[1024] = {0};
|
char sqlStr[1024] = {0};
|
||||||
|
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName, atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId,
|
sprintf(sqlStr, "insert into %s.notifyinfo values (%" PRId64 ", %d, %d)", g_stConfInfo.cdbName,
|
||||||
pInfo->consumerId);
|
atomic_fetch_add_64(&g_stConfInfo.nowTime, 1), cmdId, pInfo->consumerId);
|
||||||
|
|
||||||
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
|
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
|
||||||
|
|
||||||
|
@ -695,15 +684,15 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
|
|
||||||
static int32_t g_once_commit_flag = 0;
|
static int32_t g_once_commit_flag = 0;
|
||||||
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
|
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
|
||||||
|
|
||||||
if (0 == g_once_commit_flag) {
|
if (0 == g_once_commit_flag) {
|
||||||
g_once_commit_flag = 1;
|
g_once_commit_flag = 1;
|
||||||
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
||||||
}
|
}
|
||||||
|
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s tmq_commit_cb_print() be called\n", getCurrentTimeString(tmpString));
|
||||||
}
|
}
|
||||||
|
|
||||||
void build_consumer(SThreadInfo* pInfo) {
|
void build_consumer(SThreadInfo* pInfo) {
|
||||||
|
@ -768,7 +757,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
int retCode = queryDB(pInfo->taos, sqlStr);
|
int retCode = queryDB(pInfo->taos, sqlStr);
|
||||||
if (retCode != 0) {
|
if (retCode != 0) {
|
||||||
taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId);
|
taosFprintfFile(g_fp, "consume id %d error in save consume result\n", pInfo->consumerId);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -797,9 +786,9 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
|
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
|
||||||
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
|
|
||||||
sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId);
|
sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId);
|
||||||
pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||||
|
|
||||||
if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) {
|
if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) {
|
||||||
taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString));
|
||||||
return;
|
return;
|
||||||
|
@ -815,15 +804,15 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
|
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
|
||||||
if (tmqMsg) {
|
if (tmqMsg) {
|
||||||
if (0 != g_stConfInfo.showMsgFlag) {
|
if (0 != g_stConfInfo.showMsgFlag) {
|
||||||
tmq_res_t msgType = tmq_get_res_type(tmqMsg);
|
tmq_res_t msgType = tmq_get_res_type(tmqMsg);
|
||||||
if (msgType == TMQ_RES_TABLE_META) {
|
if (msgType == TMQ_RES_TABLE_META) {
|
||||||
totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
|
totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||||
} else if (msgType == TMQ_RES_DATA){
|
} else if (msgType == TMQ_RES_DATA) {
|
||||||
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
|
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||||
} else if (msgType == TMQ_RES_METADATA){
|
} else if (msgType == TMQ_RES_METADATA) {
|
||||||
meta_msg_process(tmqMsg, pInfo, totalMsgs);
|
meta_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||||
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
|
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(tmqMsg);
|
taos_free_result(tmqMsg);
|
||||||
|
@ -865,6 +854,9 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
|
taosFprintfFile(g_fp, "==== consumerId: %d, consumeMsgCnt: %" PRId64 ", consumeRowCnt: %" PRId64 "\n",
|
||||||
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
|
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt);
|
||||||
|
|
||||||
|
taosFsyncFile(pInfo->pConsumeRowsFile);
|
||||||
|
taosCloseFile(&pInfo->pConsumeRowsFile);
|
||||||
}
|
}
|
||||||
|
|
||||||
void* consumeThreadFunc(void* param) {
|
void* consumeThreadFunc(void* param) {
|
||||||
|
@ -879,8 +871,8 @@ void* consumeThreadFunc(void* param) {
|
||||||
build_consumer(pInfo);
|
build_consumer(pInfo);
|
||||||
build_topic_list(pInfo);
|
build_topic_list(pInfo);
|
||||||
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
||||||
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
|
taosFprintfFile(g_fp, "create consumer fail! tmq is null or topicList is null\n");
|
||||||
taos_close(pInfo->taos);
|
taos_close(pInfo->taos);
|
||||||
pInfo->taos = NULL;
|
pInfo->taos = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -889,7 +881,7 @@ void* consumeThreadFunc(void* param) {
|
||||||
if (err != 0) {
|
if (err != 0) {
|
||||||
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
pError("tmq_subscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||||
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
|
taosFprintfFile(g_fp, "tmq_subscribe() fail! reason: %s\n", tmq_err2str(err));
|
||||||
taos_close(pInfo->taos);
|
taos_close(pInfo->taos);
|
||||||
pInfo->taos = NULL;
|
pInfo->taos = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
@ -944,7 +936,8 @@ void parseConsumeInfo() {
|
||||||
token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
|
token = strtok(g_stConfInfo.stThreads[i].topicString, delim);
|
||||||
while (token != NULL) {
|
while (token != NULL) {
|
||||||
// printf("%s\n", token );
|
// printf("%s\n", token );
|
||||||
tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token, sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]));
|
tstrncpy(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic], token,
|
||||||
|
sizeof(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]));
|
||||||
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
|
ltrim(g_stConfInfo.stThreads[i].topics[g_stConfInfo.stThreads[i].numOfTopic]);
|
||||||
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
|
// printf("%s\n", g_stConfInfo.topics[g_stConfInfo.numOfTopic]);
|
||||||
g_stConfInfo.stThreads[i].numOfTopic++;
|
g_stConfInfo.stThreads[i].numOfTopic++;
|
||||||
|
@ -960,7 +953,8 @@ void parseConsumeInfo() {
|
||||||
ltrim(pstr);
|
ltrim(pstr);
|
||||||
char* ret = strchr(pstr, ch);
|
char* ret = strchr(pstr, ch);
|
||||||
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
|
memcpy(g_stConfInfo.stThreads[i].key[g_stConfInfo.stThreads[i].numOfKey], pstr, ret - pstr);
|
||||||
tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1, sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey]));
|
tstrncpy(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey], ret + 1,
|
||||||
|
sizeof(g_stConfInfo.stThreads[i].value[g_stConfInfo.stThreads[i].numOfKey]));
|
||||||
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
|
// printf("key: %s, value: %s\n", g_stConfInfo.key[g_stConfInfo.numOfKey],
|
||||||
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
|
// g_stConfInfo.value[g_stConfInfo.numOfKey]);
|
||||||
g_stConfInfo.stThreads[i].numOfKey++;
|
g_stConfInfo.stThreads[i].numOfKey++;
|
||||||
|
@ -981,12 +975,12 @@ int32_t getConsumeInfo() {
|
||||||
}
|
}
|
||||||
|
|
||||||
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
|
sprintf(sqlStr, "select * from %s.consumeinfo", g_stConfInfo.cdbName);
|
||||||
TAOS_RES *pRes = taos_query(pConn, sqlStr);
|
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes));
|
taosFprintfFile(g_fp, "error in get consumeinfo for %s\n", taos_errstr(pRes));
|
||||||
taosCloseFile(&g_fp);
|
taosCloseFile(&g_fp);
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
taos_close(pConn);
|
taos_close(pConn);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1037,19 +1031,18 @@ int32_t getConsumeInfo() {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) {
|
static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex, int64_t* lenOfRows) {
|
||||||
char buf[16*1024];
|
char buf[16 * 1024];
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
int32_t totalLen = 0;
|
int32_t totalLen = 0;
|
||||||
|
|
||||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||||
//int32_t vgroupId = tmq_get_vgroup_id(msg);
|
// int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||||
//const char* dbName = tmq_get_db_name(msg);
|
// const char* dbName = tmq_get_db_name(msg);
|
||||||
|
|
||||||
//taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
|
// taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
|
||||||
//taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
// taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
||||||
// tmq_get_topic_name(msg), vgroupId);
|
// tmq_get_topic_name(msg), vgroupId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(msg);
|
TAOS_ROW row = taos_fetch_row(msg);
|
||||||
|
@ -1058,9 +1051,9 @@ static int32_t omb_data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t m
|
||||||
|
|
||||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||||
int32_t numOfFields = taos_field_count(msg);
|
int32_t numOfFields = taos_field_count(msg);
|
||||||
//int32_t* length = taos_fetch_lengths(msg);
|
// int32_t* length = taos_fetch_lengths(msg);
|
||||||
//int32_t precision = taos_result_precision(msg);
|
// int32_t precision = taos_result_precision(msg);
|
||||||
//const char* tbName = tmq_get_table_name(msg);
|
// const char* tbName = tmq_get_table_name(msg);
|
||||||
|
|
||||||
taos_print_row(buf, row, fields, numOfFields);
|
taos_print_row(buf, row, fields, numOfFields);
|
||||||
totalLen += strlen(buf);
|
totalLen += strlen(buf);
|
||||||
|
@ -1082,8 +1075,7 @@ void omb_loop_consume(SThreadInfo* pInfo) {
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
|
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
|
||||||
pInfo->consumerId);
|
pInfo->consumerId);
|
||||||
printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString),
|
printf("%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
|
||||||
pInfo->consumerId);
|
|
||||||
|
|
||||||
pInfo->ts = taosGetTimestampMs();
|
pInfo->ts = taosGetTimestampMs();
|
||||||
|
|
||||||
|
@ -1091,55 +1083,55 @@ void omb_loop_consume(SThreadInfo* pInfo) {
|
||||||
uint64_t lastPrintTime = taosGetTimestampMs();
|
uint64_t lastPrintTime = taosGetTimestampMs();
|
||||||
uint64_t startTs = taosGetTimestampMs();
|
uint64_t startTs = taosGetTimestampMs();
|
||||||
|
|
||||||
int64_t totalLenOfMsg = 0;
|
int64_t totalLenOfMsg = 0;
|
||||||
int64_t lastTotalLenOfMsg = 0;
|
int64_t lastTotalLenOfMsg = 0;
|
||||||
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
|
int32_t consumeDelay = g_stConfInfo.consumeDelay == -1 ? -1 : (g_stConfInfo.consumeDelay * 1000);
|
||||||
while (running) {
|
while (running) {
|
||||||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
|
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
|
||||||
if (tmqMsg) {
|
if (tmqMsg) {
|
||||||
int64_t lenOfMsg = 0;
|
int64_t lenOfMsg = 0;
|
||||||
totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg);
|
totalRows += omb_data_msg_process(tmqMsg, pInfo, totalMsgs, &lenOfMsg);
|
||||||
totalLenOfMsg += lenOfMsg;
|
totalLenOfMsg += lenOfMsg;
|
||||||
taos_free_result(tmqMsg);
|
taos_free_result(tmqMsg);
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
if (currentPrintTime - lastPrintTime > 10 * 1000) {
|
if (currentPrintTime - lastPrintTime > 10 * 1000) {
|
||||||
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
|
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
|
||||||
int64_t deltaTime = currentPrintTime - lastPrintTime;
|
int64_t deltaTime = currentPrintTime - lastPrintTime;
|
||||||
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n",
|
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
|
||||||
pInfo->consumerId, totalRows, totalMsgs,
|
", rate: %.3f msgs/s, %.1f MB/s\n",
|
||||||
(totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
|
pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
|
||||||
currentLenOfMsg*1000.0/(1024*1024)/deltaTime);
|
currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
|
||||||
|
|
||||||
taosFprintfFile(
|
taosFprintfFile(g_fp,
|
||||||
g_fp, "consumer id %d has currently poll total msgs: %" PRId64 ", period cons rate: %.3f msgs/s, %.1f MB/s\n",
|
"consumer id %d has currently poll total msgs: %" PRId64
|
||||||
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime, currentLenOfMsg*1000.0/deltaTime);
|
", period cons rate: %.3f msgs/s, %.1f MB/s\n",
|
||||||
|
pInfo->consumerId, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
|
||||||
|
currentLenOfMsg * 1000.0 / deltaTime);
|
||||||
lastPrintTime = currentPrintTime;
|
lastPrintTime = currentPrintTime;
|
||||||
lastTotalMsgs = totalMsgs;
|
lastTotalMsgs = totalMsgs;
|
||||||
lastTotalLenOfMsg = totalLenOfMsg;
|
lastTotalLenOfMsg = totalLenOfMsg;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||||
printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
printf("%s no poll more msg when time over, break consume\n", getCurrentTimeString(tmpString));
|
||||||
int64_t currentPrintTime = taosGetTimestampMs();
|
int64_t currentPrintTime = taosGetTimestampMs();
|
||||||
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
|
int64_t currentLenOfMsg = totalLenOfMsg - lastTotalLenOfMsg;
|
||||||
int64_t deltaTime = currentPrintTime - lastPrintTime;
|
int64_t deltaTime = currentPrintTime - lastPrintTime;
|
||||||
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64 ", rate: %.3f msgs/s, %.1f MB/s\n",
|
printf("consumer id %d has currently cons total rows: %" PRId64 ", msgs: %" PRId64
|
||||||
pInfo->consumerId, totalRows, totalMsgs,
|
", rate: %.3f msgs/s, %.1f MB/s\n",
|
||||||
(totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
|
pInfo->consumerId, totalRows, totalMsgs, (totalMsgs - lastTotalMsgs) * 1000.0 / deltaTime,
|
||||||
currentLenOfMsg*1000.0/(1024*1024)/deltaTime);
|
currentLenOfMsg * 1000.0 / (1024 * 1024) / deltaTime);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pInfo->consumeMsgCnt = totalMsgs;
|
pInfo->consumeMsgCnt = totalMsgs;
|
||||||
pInfo->consumeRowCnt = totalRows;
|
pInfo->consumeRowCnt = totalRows;
|
||||||
pInfo->consumeLen = totalLenOfMsg;
|
pInfo->consumeLen = totalLenOfMsg;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void* ombConsumeThreadFunc(void* param) {
|
void* ombConsumeThreadFunc(void* param) {
|
||||||
SThreadInfo* pInfo = (SThreadInfo*)param;
|
SThreadInfo* pInfo = (SThreadInfo*)param;
|
||||||
|
|
||||||
|
@ -1206,26 +1198,24 @@ void* ombConsumeThreadFunc(void* param) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int queryDbExec(TAOS* taos, char* command, QUERY_TYPE type) {
|
||||||
|
TAOS_RES* res = taos_query(taos, command);
|
||||||
|
int32_t code = taos_errno(res);
|
||||||
|
|
||||||
|
if (code != 0) {
|
||||||
static int queryDbExec(TAOS *taos, char *command, QUERY_TYPE type) {
|
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
|
||||||
TAOS_RES *res = taos_query(taos, command);
|
|
||||||
int32_t code = taos_errno(res);
|
|
||||||
|
|
||||||
if (code != 0) {
|
|
||||||
pPrint("%s Failed to execute <%s>, reason: %s %s", GREEN, command, taos_errstr(res), NC);
|
|
||||||
taos_free_result(res);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (INSERT_TYPE == type) {
|
|
||||||
int affectedRows = taos_affected_rows(res);
|
|
||||||
taos_free_result(res);
|
|
||||||
return affectedRows;
|
|
||||||
}
|
|
||||||
|
|
||||||
taos_free_result(res);
|
taos_free_result(res);
|
||||||
return 0;
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (INSERT_TYPE == type) {
|
||||||
|
int affectedRows = taos_affected_rows(res);
|
||||||
|
taos_free_result(res);
|
||||||
|
return affectedRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(res);
|
||||||
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void* ombProduceThreadFunc(void* param) {
|
void* ombProduceThreadFunc(void* param) {
|
||||||
|
@ -1233,101 +1223,100 @@ void* ombProduceThreadFunc(void* param) {
|
||||||
|
|
||||||
pInfo->taos = createNewTaosConnect();
|
pInfo->taos = createNewTaosConnect();
|
||||||
if (pInfo->taos == NULL) {
|
if (pInfo->taos == NULL) {
|
||||||
taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n");
|
taosFprintfFile(g_fp, "taos_connect() fail, can not start producers!\n");
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t affectedRowsTotal = 0;
|
int64_t affectedRowsTotal = 0;
|
||||||
int64_t sendMsgs = 0;
|
int64_t sendMsgs = 0;
|
||||||
|
|
||||||
uint32_t totalSendLoopTimes = g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms
|
uint32_t totalSendLoopTimes =
|
||||||
uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize;
|
g_stConfInfo.runDurationMinutes * 60 * 1000 / SEND_TIME_UNIT; // send some msgs per 10ms
|
||||||
uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize;
|
uint32_t batchPerTblTimes = pInfo->producerRate / 100 / g_stConfInfo.batchSize;
|
||||||
|
uint32_t remainder = (pInfo->producerRate / 100) % g_stConfInfo.batchSize;
|
||||||
if (remainder) {
|
if (remainder) {
|
||||||
batchPerTblTimes += 1;
|
batchPerTblTimes += 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
|
char* sqlBuf = taosMemoryMalloc(MAX_SQL_LEN);
|
||||||
if (NULL == sqlBuf) {
|
if (NULL == sqlBuf) {
|
||||||
printf("malloc fail for sqlBuf\n");
|
printf("malloc fail for sqlBuf\n");
|
||||||
taos_close(pInfo->taos);
|
taos_close(pInfo->taos);
|
||||||
pInfo->taos = NULL;
|
pInfo->taos = NULL;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes, batchPerTblTimes, pInfo->producerRate);
|
printf("Produce Info: totalSendLoopTimes: %d, batchPerTblTimes: %d, producerRate: %d\n", totalSendLoopTimes,
|
||||||
|
batchPerTblTimes, pInfo->producerRate);
|
||||||
|
|
||||||
char ctbName[128] = {0};
|
char ctbName[128] = {0};
|
||||||
sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId);
|
sprintf(ctbName, "%s.ctb%d", g_stConfInfo.dbName, pInfo->consumerId);
|
||||||
|
|
||||||
int64_t lastPrintTime = taosGetTimestampUs();
|
int64_t lastPrintTime = taosGetTimestampUs();
|
||||||
int64_t totalMsgLen = 0;
|
int64_t totalMsgLen = 0;
|
||||||
//int64_t timeStamp = taosGetTimestampUs();
|
// int64_t timeStamp = taosGetTimestampUs();
|
||||||
while (totalSendLoopTimes) {
|
while (totalSendLoopTimes) {
|
||||||
int64_t startTs = taosGetTimestampUs();
|
int64_t startTs = taosGetTimestampUs();
|
||||||
for (int i = 0; i < batchPerTblTimes; ++i) {
|
for (int i = 0; i < batchPerTblTimes; ++i) {
|
||||||
uint32_t msgsOfSql = g_stConfInfo.batchSize;
|
uint32_t msgsOfSql = g_stConfInfo.batchSize;
|
||||||
if ((i == batchPerTblTimes - 1) && (0 != remainder)) {
|
if ((i == batchPerTblTimes - 1) && (0 != remainder)) {
|
||||||
msgsOfSql = remainder;
|
msgsOfSql = remainder;
|
||||||
}
|
}
|
||||||
int len = 0;
|
int len = 0;
|
||||||
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "insert into %s values ", ctbName);
|
len += snprintf(sqlBuf + len, MAX_SQL_LEN - len, "insert into %s values ", ctbName);
|
||||||
for (int j = 0; j < msgsOfSql; j++) {
|
for (int j = 0; j < msgsOfSql; j++) {
|
||||||
int64_t timeStamp = taosGetTimestampNs();
|
int64_t timeStamp = taosGetTimestampNs();
|
||||||
len += snprintf(sqlBuf+len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
|
len += snprintf(sqlBuf + len, MAX_SQL_LEN - len, "(%" PRId64 ", \"%s\")", timeStamp, g_payload);
|
||||||
sendMsgs++;
|
sendMsgs++;
|
||||||
pInfo->totalProduceMsgs++;
|
pInfo->totalProduceMsgs++;
|
||||||
}
|
}
|
||||||
|
|
||||||
totalMsgLen += len;
|
totalMsgLen += len;
|
||||||
pInfo->totalMsgsLen += len;
|
pInfo->totalMsgsLen += len;
|
||||||
|
|
||||||
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
|
|
||||||
if (affectedRows < 0) {
|
|
||||||
taos_close(pInfo->taos);
|
|
||||||
pInfo->taos = NULL;
|
|
||||||
taosMemoryFree(sqlBuf);
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
affectedRowsTotal += affectedRows;
|
int64_t affectedRows = queryDbExec(pInfo->taos, sqlBuf, INSERT_TYPE);
|
||||||
|
if (affectedRows < 0) {
|
||||||
|
taos_close(pInfo->taos);
|
||||||
|
pInfo->taos = NULL;
|
||||||
|
taosMemoryFree(sqlBuf);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
//printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows);
|
affectedRowsTotal += affectedRows;
|
||||||
|
|
||||||
|
// printf("Produce Info: affectedRows: %" PRId64 "\n", affectedRows);
|
||||||
}
|
}
|
||||||
totalSendLoopTimes -= 1;
|
totalSendLoopTimes -= 1;
|
||||||
|
|
||||||
// calc spent time
|
// calc spent time
|
||||||
int64_t currentTs = taosGetTimestampUs();
|
int64_t currentTs = taosGetTimestampUs();
|
||||||
int64_t delta = currentTs - startTs;
|
int64_t delta = currentTs - startTs;
|
||||||
if (delta < SEND_TIME_UNIT * 1000) {
|
if (delta < SEND_TIME_UNIT * 1000) {
|
||||||
int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta);
|
int64_t sleepLen = (int32_t)(SEND_TIME_UNIT * 1000 - delta);
|
||||||
//printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta);
|
// printf("sleep %" PRId64 " us, use time: %" PRId64 " us\n", sleepLen, delta);
|
||||||
taosUsleep((int32_t)sleepLen);
|
taosUsleep((int32_t)sleepLen);
|
||||||
}
|
}
|
||||||
|
|
||||||
currentTs = taosGetTimestampUs();
|
currentTs = taosGetTimestampUs();
|
||||||
delta = currentTs - lastPrintTime;
|
delta = currentTs - lastPrintTime;
|
||||||
if (delta > 10 * 1000 * 1000) {
|
if (delta > 10 * 1000 * 1000) {
|
||||||
printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n",
|
printf("producer[%d] info: %" PRId64 " msgs, %" PRId64 " Byte, %" PRId64 " us, totalSendLoopTimes: %d\n",
|
||||||
pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes);
|
pInfo->consumerId, sendMsgs, totalMsgLen, delta, totalSendLoopTimes);
|
||||||
printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n",
|
printf("producer[%d] rate: %1.f msgs/s, %1.f KB/s\n", pInfo->consumerId, sendMsgs * 1000.0 * 1000 / delta,
|
||||||
pInfo->consumerId,
|
(totalMsgLen / 1024.0) / (delta / (1000 * 1000)));
|
||||||
sendMsgs * 1000.0 * 1000 / delta,
|
lastPrintTime = currentTs;
|
||||||
(totalMsgLen / 1024.0) / (delta / (1000*1000)));
|
sendMsgs = 0;
|
||||||
lastPrintTime = currentTs;
|
totalMsgLen = 0;
|
||||||
sendMsgs = 0;
|
}
|
||||||
totalMsgLen = 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
printf("affectedRowsTotal: %"PRId64"\n", affectedRowsTotal);
|
printf("affectedRowsTotal: %" PRId64 "\n", affectedRowsTotal);
|
||||||
taos_close(pInfo->taos);
|
taos_close(pInfo->taos);
|
||||||
pInfo->taos = NULL;
|
pInfo->taos = NULL;
|
||||||
taosMemoryFree(sqlBuf);
|
taosMemoryFree(sqlBuf);
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void printProduceInfo(int64_t start) {
|
void printProduceInfo(int64_t start) {
|
||||||
int64_t totalMsgs = 0;
|
int64_t totalMsgs = 0;
|
||||||
int64_t totalLenOfMsgs = 0;
|
int64_t totalLenOfMsgs = 0;
|
||||||
|
@ -1344,87 +1333,86 @@ void printProduceInfo(int64_t start) {
|
||||||
double tInMs = (double)t / 1000000.0;
|
double tInMs = (double)t / 1000000.0;
|
||||||
printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs);
|
printf("Spent %.3f seconds to prod %" PRIu64 " msgs, %" PRIu64 " Byte\n\n", tInMs, totalMsgs, totalLenOfMsgs);
|
||||||
|
|
||||||
|
|
||||||
printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
printf("Spent %.3f seconds to prod %" PRIu64 " msgs with %d producer(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
||||||
tInMs, totalMsgs, g_stConfInfo.producers,
|
tInMs, totalMsgs, g_stConfInfo.producers, (double)totalMsgs / tInMs,
|
||||||
(double)totalMsgs / tInMs,
|
(double)totalLenOfMsgs / (1024.0 * 1024) / tInMs);
|
||||||
(double)totalLenOfMsgs/(1024.0*1024)/tInMs);
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
void startOmbConsume() {
|
void startOmbConsume() {
|
||||||
TdThreadAttr thattr;
|
TdThreadAttr thattr;
|
||||||
taosThreadAttrInit(&thattr);
|
taosThreadAttrInit(&thattr);
|
||||||
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
taosThreadAttrSetDetachState(&thattr, PTHREAD_CREATE_JOINABLE);
|
||||||
|
|
||||||
if (0 != g_stConfInfo.producers) {
|
if (0 != g_stConfInfo.producers) {
|
||||||
TAOS* taos = createNewTaosConnect();
|
TAOS* taos = createNewTaosConnect();
|
||||||
if (taos == NULL) {
|
if (taos == NULL) {
|
||||||
taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n");
|
taosFprintfFile(g_fp, "taos_connect() fail, can not create db, stbl, ctbl, topic!\n");
|
||||||
return ;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
char stbName[16] = "stb";
|
char stbName[16] = "stb";
|
||||||
char ctbPrefix[16] = "ctb";
|
char ctbPrefix[16] = "ctb";
|
||||||
|
|
||||||
char sql[256] = {0};
|
char sql[256] = {0};
|
||||||
sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName);
|
sprintf(sql, "drop database if exists %s", g_stConfInfo.dbName);
|
||||||
printf("SQL: %s\n", sql);
|
printf("SQL: %s\n", sql);
|
||||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
|
||||||
|
|
||||||
sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName, g_stConfInfo.producers);
|
|
||||||
printf("SQL: %s\n", sql);
|
|
||||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||||
|
|
||||||
sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName, stbName, g_stConfInfo.payloadLen);
|
sprintf(sql, "create database if not exists %s precision 'ns' vgroups %d", g_stConfInfo.dbName,
|
||||||
printf("SQL: %s\n", sql);
|
g_stConfInfo.producers);
|
||||||
|
printf("SQL: %s\n", sql);
|
||||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||||
|
|
||||||
for (int i = 0; i < g_stConfInfo.producers; i++) {
|
sprintf(sql, "create stable %s.%s (ts timestamp, payload binary(%d)) tags (t bigint) ", g_stConfInfo.dbName,
|
||||||
sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i, g_stConfInfo.dbName, i);
|
stbName, g_stConfInfo.payloadLen);
|
||||||
printf("SQL: %s\n", sql);
|
printf("SQL: %s\n", sql);
|
||||||
|
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||||
|
|
||||||
|
for (int i = 0; i < g_stConfInfo.producers; i++) {
|
||||||
|
sprintf(sql, "create table %s.%s%d using %s.stb tags(%d) ", g_stConfInfo.dbName, ctbPrefix, i,
|
||||||
|
g_stConfInfo.dbName, i);
|
||||||
|
printf("SQL: %s\n", sql);
|
||||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||||
}
|
}
|
||||||
|
|
||||||
// create topic
|
// create topic
|
||||||
sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName);
|
sprintf(sql, "create topic %s as stable %s.%s", g_stConfInfo.topic, g_stConfInfo.dbName, stbName);
|
||||||
printf("SQL: %s\n", sql);
|
printf("SQL: %s\n", sql);
|
||||||
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
queryDbExec(taos, sql, NO_INSERT_TYPE);
|
||||||
|
|
||||||
|
|
||||||
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
|
int32_t producerRate = ceil(g_stConfInfo.producerRate / g_stConfInfo.producers);
|
||||||
|
|
||||||
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
|
printf("==== create %d produce thread ====\n", g_stConfInfo.producers);
|
||||||
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
|
for (int32_t i = 0; i < g_stConfInfo.producers; ++i) {
|
||||||
g_stConfInfo.stProdThreads[i].consumerId = i;
|
g_stConfInfo.stProdThreads[i].consumerId = i;
|
||||||
g_stConfInfo.stProdThreads[i].producerRate = producerRate;
|
g_stConfInfo.stProdThreads[i].producerRate = producerRate;
|
||||||
taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc,
|
taosThreadCreate(&(g_stConfInfo.stProdThreads[i].thread), &thattr, ombProduceThreadFunc,
|
||||||
(void*)(&(g_stConfInfo.stProdThreads[i])));
|
(void*)(&(g_stConfInfo.stProdThreads[i])));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (0 == g_stConfInfo.numOfThread) {
|
if (0 == g_stConfInfo.numOfThread) {
|
||||||
int64_t start = taosGetTimestampUs();
|
int64_t start = taosGetTimestampUs();
|
||||||
for (int32_t i = 0; i < g_stConfInfo.producers; i++) {
|
for (int32_t i = 0; i < g_stConfInfo.producers; i++) {
|
||||||
taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL);
|
taosThreadJoin(g_stConfInfo.stProdThreads[i].thread, NULL);
|
||||||
taosThreadClear(&g_stConfInfo.stProdThreads[i].thread);
|
taosThreadClear(&g_stConfInfo.stProdThreads[i].thread);
|
||||||
}
|
}
|
||||||
|
|
||||||
printProduceInfo(start);
|
printProduceInfo(start);
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
|
||||||
taosCloseFile(&g_fp);
|
|
||||||
taos_close(taos);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
taos_close(taos);
|
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
||||||
|
taosCloseFile(&g_fp);
|
||||||
|
taos_close(taos);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_close(taos);
|
||||||
}
|
}
|
||||||
|
|
||||||
// pthread_create one thread to consume
|
// pthread_create one thread to consume
|
||||||
taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
|
taosFprintfFile(g_fp, "==== create %d consume thread ====\n", g_stConfInfo.numOfThread);
|
||||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
|
for (int32_t i = 0; i < g_stConfInfo.numOfThread; ++i) {
|
||||||
g_stConfInfo.stThreads[i].consumerId = i;
|
g_stConfInfo.stThreads[i].consumerId = i;
|
||||||
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc,
|
taosThreadCreate(&(g_stConfInfo.stThreads[i].thread), &thattr, ombConsumeThreadFunc,
|
||||||
(void*)(&(g_stConfInfo.stThreads[i])));
|
(void*)(&(g_stConfInfo.stThreads[i])));
|
||||||
}
|
}
|
||||||
|
@ -1443,24 +1431,23 @@ void startOmbConsume() {
|
||||||
int64_t totalLenOfMsgs = 0;
|
int64_t totalLenOfMsgs = 0;
|
||||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||||
totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
|
totalMsgs += g_stConfInfo.stThreads[i].consumeMsgCnt;
|
||||||
totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen;
|
totalLenOfMsgs += g_stConfInfo.stThreads[i].consumeLen;
|
||||||
totalRows += g_stConfInfo.stThreads[i].consumeRowCnt;
|
totalRows += g_stConfInfo.stThreads[i].consumeRowCnt;
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t t = end - start;
|
int64_t t = end - start;
|
||||||
if (0 == t) t = 1;
|
if (0 == t) t = 1;
|
||||||
|
|
||||||
double tInMs = (double)t / 1000000.0;
|
double tInMs = (double)t / 1000000.0;
|
||||||
taosFprintfFile(g_fp,
|
taosFprintfFile(
|
||||||
"Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
g_fp, "Spent %.3f seconds to poll msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
||||||
tInMs, totalMsgs, g_stConfInfo.numOfThread,
|
tInMs, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
|
||||||
(double)(totalMsgs / tInMs),
|
(double)totalLenOfMsgs / (1024 * 1024) / tInMs);
|
||||||
(double)totalLenOfMsgs/(1024*1024)/tInMs);
|
|
||||||
|
|
||||||
printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64 " with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
printf("Spent %.3f seconds to cons rows: %" PRIu64 " msgs: %" PRIu64
|
||||||
tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread,
|
" with %d thread(s), throughput: %.3f msgs/s, %.1f MB/s\n\n",
|
||||||
(double)(totalMsgs / tInMs),
|
tInMs, totalRows, totalMsgs, g_stConfInfo.numOfThread, (double)(totalMsgs / tInMs),
|
||||||
(double)totalLenOfMsgs/(1024*1024)/tInMs);
|
(double)totalLenOfMsgs / (1024 * 1024) / tInMs);
|
||||||
|
|
||||||
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
taosFprintfFile(g_fp, "==== close tmqlog ====\n");
|
||||||
taosCloseFile(&g_fp);
|
taosCloseFile(&g_fp);
|
||||||
|
@ -1468,20 +1455,19 @@ void startOmbConsume() {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
int main(int32_t argc, char* argv[]) {
|
int main(int32_t argc, char* argv[]) {
|
||||||
parseArgument(argc, argv);
|
parseArgument(argc, argv);
|
||||||
|
|
||||||
if (0 != strlen(g_stConfInfo.topic)) {
|
if (0 != strlen(g_stConfInfo.topic)) {
|
||||||
startOmbConsume();
|
startOmbConsume();
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retCode = getConsumeInfo();
|
int32_t retCode = getConsumeInfo();
|
||||||
if (0 != retCode) {
|
if (0 != retCode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
saveConfigToLogFile();
|
saveConfigToLogFile();
|
||||||
|
|
||||||
tmqSetSignalHandle();
|
tmqSetSignalHandle();
|
||||||
|
|
Loading…
Reference in New Issue