commit
bcf8fbfdf2
|
@ -32,6 +32,7 @@
|
||||||
#define MAX_SQL_STR_LEN (1024 * 1024)
|
#define MAX_SQL_STR_LEN (1024 * 1024)
|
||||||
#define MAX_ROW_STR_LEN (16 * 1024)
|
#define MAX_ROW_STR_LEN (16 * 1024)
|
||||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||||
|
#define MAX_VGROUP_CNT (32)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
|
@ -62,6 +63,10 @@ typedef struct {
|
||||||
tmq_t* tmq;
|
tmq_t* tmq;
|
||||||
tmq_list_t* topicList;
|
tmq_list_t* topicList;
|
||||||
|
|
||||||
|
int32_t numOfVgroups;
|
||||||
|
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
||||||
|
int64_t ts;
|
||||||
|
|
||||||
} SThreadInfo;
|
} SThreadInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -70,6 +75,7 @@ typedef struct {
|
||||||
char dbName[32];
|
char dbName[32];
|
||||||
int32_t showMsgFlag;
|
int32_t showMsgFlag;
|
||||||
int32_t showRowFlag;
|
int32_t showRowFlag;
|
||||||
|
int32_t saveRowFlag;
|
||||||
int32_t consumeDelay; // unit s
|
int32_t consumeDelay; // unit s
|
||||||
int32_t numOfThread;
|
int32_t numOfThread;
|
||||||
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
||||||
|
@ -77,6 +83,7 @@ typedef struct {
|
||||||
|
|
||||||
static SConfInfo g_stConfInfo;
|
static SConfInfo g_stConfInfo;
|
||||||
TdFilePtr g_fp = NULL;
|
TdFilePtr g_fp = NULL;
|
||||||
|
static int running = 1;
|
||||||
|
|
||||||
// char* g_pRowValue = NULL;
|
// char* g_pRowValue = NULL;
|
||||||
// TdFilePtr g_fp = NULL;
|
// TdFilePtr g_fp = NULL;
|
||||||
|
@ -93,6 +100,8 @@ static void printHelp() {
|
||||||
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
|
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
|
||||||
printf("%s%s\n", indent, "-r");
|
printf("%s%s\n", indent, "-r");
|
||||||
printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
|
printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
|
||||||
|
printf("%s%s\n", indent, "-s");
|
||||||
|
printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
|
||||||
printf("%s%s\n", indent, "-y");
|
printf("%s%s\n", indent, "-y");
|
||||||
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
|
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
|
||||||
exit(EXIT_SUCCESS);
|
exit(EXIT_SUCCESS);
|
||||||
|
@ -135,6 +144,7 @@ void saveConfigToLogFile() {
|
||||||
taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
|
taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
|
||||||
taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
|
taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
|
||||||
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
|
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
|
||||||
|
taosFprintfFile(g_fp, "# saveRowFlag: %d\n", g_stConfInfo.saveRowFlag);
|
||||||
taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
|
taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
|
||||||
taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread);
|
taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread);
|
||||||
|
|
||||||
|
@ -165,6 +175,7 @@ void parseArgument(int32_t argc, char* argv[]) {
|
||||||
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
|
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
|
||||||
g_stConfInfo.showMsgFlag = 0;
|
g_stConfInfo.showMsgFlag = 0;
|
||||||
g_stConfInfo.showRowFlag = 0;
|
g_stConfInfo.showRowFlag = 0;
|
||||||
|
g_stConfInfo.saveRowFlag = 0;
|
||||||
g_stConfInfo.consumeDelay = 5;
|
g_stConfInfo.consumeDelay = 5;
|
||||||
|
|
||||||
for (int32_t i = 1; i < argc; i++) {
|
for (int32_t i = 1; i < argc; i++) {
|
||||||
|
@ -181,6 +192,8 @@ void parseArgument(int32_t argc, char* argv[]) {
|
||||||
g_stConfInfo.showMsgFlag = atol(argv[++i]);
|
g_stConfInfo.showMsgFlag = atol(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-r") == 0) {
|
} else if (strcmp(argv[i], "-r") == 0) {
|
||||||
g_stConfInfo.showRowFlag = atol(argv[++i]);
|
g_stConfInfo.showRowFlag = atol(argv[++i]);
|
||||||
|
} else if (strcmp(argv[i], "-s") == 0) {
|
||||||
|
g_stConfInfo.saveRowFlag = atol(argv[++i]);
|
||||||
} else if (strcmp(argv[i], "-y") == 0) {
|
} else if (strcmp(argv[i], "-y") == 0) {
|
||||||
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
||||||
} else {
|
} else {
|
||||||
|
@ -200,6 +213,7 @@ void parseArgument(int32_t argc, char* argv[]) {
|
||||||
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
|
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
|
||||||
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
|
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
|
||||||
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
|
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
|
||||||
|
pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
|
||||||
#endif
|
#endif
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -225,15 +239,64 @@ void ltrim(char* str) {
|
||||||
// return str;
|
// return str;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int running = 1;
|
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
||||||
static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
|
int32_t i;
|
||||||
|
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
||||||
|
if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
|
||||||
|
pInfo->rowsOfPerVgroups[i][1] += rows;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
|
||||||
|
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
||||||
|
pInfo->numOfVgroups++;
|
||||||
|
|
||||||
|
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
|
||||||
|
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
||||||
|
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, pInfo->numOfVgroups, vgroupId);
|
||||||
|
taosCloseFile(&g_fp);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
||||||
|
char sqlStr[1100] = {0};
|
||||||
|
|
||||||
|
if (strlen(buf) > 1024) {
|
||||||
|
taosFprintfFile(g_fp, "The length of one row[%d] is overflow 1024\n", strlen(buf));
|
||||||
|
taosCloseFile(&g_fp);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||||
|
assert(pConn != NULL);
|
||||||
|
|
||||||
|
sprintf(sqlStr, "insert into %s.content_%d values (%"PRId64", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->ts++, buf);
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
|
||||||
|
taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
|
||||||
|
taosCloseFile(&g_fp);
|
||||||
|
taos_free_result(pRes);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
||||||
char buf[1024];
|
char buf[1024];
|
||||||
int32_t totalRows = 0;
|
int32_t totalRows = 0;
|
||||||
|
|
||||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||||
// printf("vg:%d\n", tmq_get_vgroup_id(msg));
|
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||||
taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable);
|
|
||||||
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
|
taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId);
|
||||||
|
//taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, tmq_get_table_name(msg));
|
||||||
|
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId);
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_ROW row = taos_fetch_row(msg);
|
TAOS_ROW row = taos_fetch_row(msg);
|
||||||
|
@ -247,11 +310,16 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
|
||||||
|
|
||||||
if (0 != g_stConfInfo.showRowFlag) {
|
if (0 != g_stConfInfo.showRowFlag) {
|
||||||
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
|
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
|
||||||
|
if (0 != g_stConfInfo.saveRowFlag) {
|
||||||
|
saveConsumeContentToTbl(pInfo, buf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
totalRows++;
|
totalRows++;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
||||||
|
|
||||||
return totalRows;
|
return totalRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,6 +412,32 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
// vgroups
|
||||||
|
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
||||||
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
|
sprintf(sqlStr, "insert into %s.vgroup_%d values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
|
||||||
|
g_stConfInfo.cdbName,
|
||||||
|
now,
|
||||||
|
pInfo->consumerId,
|
||||||
|
pInfo->consumeMsgCnt,
|
||||||
|
pInfo->consumeRowCnt,
|
||||||
|
pInfo->checkresult);
|
||||||
|
|
||||||
|
char tmpString[128];
|
||||||
|
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
|
||||||
|
|
||||||
|
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||||
|
if (taos_errno(pRes) != 0) {
|
||||||
|
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
||||||
|
taos_free_result(pRes);
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
taos_free_result(pRes);
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -356,11 +450,13 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
|
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
|
||||||
|
|
||||||
|
pInfo->ts = taosGetTimestampMs();
|
||||||
|
|
||||||
while (running) {
|
while (running) {
|
||||||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
|
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
|
||||||
if (tmqMsg) {
|
if (tmqMsg) {
|
||||||
if (0 != g_stConfInfo.showMsgFlag) {
|
if (0 != g_stConfInfo.showMsgFlag) {
|
||||||
totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
|
totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
|
||||||
}
|
}
|
||||||
|
|
||||||
taos_free_result(tmqMsg);
|
taos_free_result(tmqMsg);
|
||||||
|
|
Loading…
Reference in New Issue