test: modify consumer processor
This commit is contained in:
parent
a352f2676d
commit
f6f43be51e
|
@ -32,6 +32,7 @@
|
|||
#define MAX_SQL_STR_LEN (1024 * 1024)
|
||||
#define MAX_ROW_STR_LEN (16 * 1024)
|
||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||
#define MAX_VGROUP_CNT (32)
|
||||
|
||||
typedef struct {
|
||||
TdThread thread;
|
||||
|
@ -61,6 +62,10 @@ typedef struct {
|
|||
|
||||
tmq_t* tmq;
|
||||
tmq_list_t* topicList;
|
||||
|
||||
int32_t numOfVgroups;
|
||||
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
||||
int64_t ts;
|
||||
|
||||
} SThreadInfo;
|
||||
|
||||
|
@ -69,7 +74,8 @@ typedef struct {
|
|||
char cdbName[32];
|
||||
char dbName[32];
|
||||
int32_t showMsgFlag;
|
||||
int32_t showRowFlag;
|
||||
int32_t showRowFlag;
|
||||
int32_t saveRowFlag;
|
||||
int32_t consumeDelay; // unit s
|
||||
int32_t numOfThread;
|
||||
SThreadInfo stThreads[MAX_CONSUMER_THREAD_CNT];
|
||||
|
@ -77,6 +83,7 @@ typedef struct {
|
|||
|
||||
static SConfInfo g_stConfInfo;
|
||||
TdFilePtr g_fp = NULL;
|
||||
static int running = 1;
|
||||
|
||||
// char* g_pRowValue = NULL;
|
||||
// TdFilePtr g_fp = NULL;
|
||||
|
@ -93,6 +100,8 @@ static void printHelp() {
|
|||
printf("%s%s%s%d\n", indent, indent, "showMsgFlag, default is ", g_stConfInfo.showMsgFlag);
|
||||
printf("%s%s\n", indent, "-r");
|
||||
printf("%s%s%s%d\n", indent, indent, "showRowFlag, default is ", g_stConfInfo.showRowFlag);
|
||||
printf("%s%s\n", indent, "-s");
|
||||
printf("%s%s%s%d\n", indent, indent, "saveRowFlag, default is ", g_stConfInfo.saveRowFlag);
|
||||
printf("%s%s\n", indent, "-y");
|
||||
printf("%s%s%s%d\n", indent, indent, "consume delay, default is s", g_stConfInfo.consumeDelay);
|
||||
exit(EXIT_SUCCESS);
|
||||
|
@ -135,6 +144,7 @@ void saveConfigToLogFile() {
|
|||
taosFprintfFile(g_fp, "# cdbName: %s\n", g_stConfInfo.cdbName);
|
||||
taosFprintfFile(g_fp, "# showMsgFlag: %d\n", g_stConfInfo.showMsgFlag);
|
||||
taosFprintfFile(g_fp, "# showRowFlag: %d\n", g_stConfInfo.showRowFlag);
|
||||
taosFprintfFile(g_fp, "# saveRowFlag: %d\n", g_stConfInfo.saveRowFlag);
|
||||
taosFprintfFile(g_fp, "# consumeDelay: %d\n", g_stConfInfo.consumeDelay);
|
||||
taosFprintfFile(g_fp, "# numOfThread: %d\n", g_stConfInfo.numOfThread);
|
||||
|
||||
|
@ -165,6 +175,7 @@ void parseArgument(int32_t argc, char* argv[]) {
|
|||
memset(&g_stConfInfo, 0, sizeof(SConfInfo));
|
||||
g_stConfInfo.showMsgFlag = 0;
|
||||
g_stConfInfo.showRowFlag = 0;
|
||||
g_stConfInfo.saveRowFlag = 0;
|
||||
g_stConfInfo.consumeDelay = 5;
|
||||
|
||||
for (int32_t i = 1; i < argc; i++) {
|
||||
|
@ -181,6 +192,8 @@ void parseArgument(int32_t argc, char* argv[]) {
|
|||
g_stConfInfo.showMsgFlag = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-r") == 0) {
|
||||
g_stConfInfo.showRowFlag = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-s") == 0) {
|
||||
g_stConfInfo.saveRowFlag = atol(argv[++i]);
|
||||
} else if (strcmp(argv[i], "-y") == 0) {
|
||||
g_stConfInfo.consumeDelay = atol(argv[++i]);
|
||||
} else {
|
||||
|
@ -200,6 +213,7 @@ void parseArgument(int32_t argc, char* argv[]) {
|
|||
pPrint("%s consumeDelay:%d %s", GREEN, g_stConfInfo.consumeDelay, NC);
|
||||
pPrint("%s showMsgFlag:%d %s", GREEN, g_stConfInfo.showMsgFlag, NC);
|
||||
pPrint("%s showRowFlag:%d %s", GREEN, g_stConfInfo.showRowFlag, NC);
|
||||
pPrint("%s saveRowFlag:%d %s", GREEN, g_stConfInfo.saveRowFlag, NC);
|
||||
#endif
|
||||
}
|
||||
|
||||
|
@ -225,15 +239,58 @@ void ltrim(char* str) {
|
|||
// return str;
|
||||
}
|
||||
|
||||
static int running = 1;
|
||||
static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) {
|
||||
void addRowsToVgroupId(SThreadInfo* pInfo, int32_t vgroupId, int32_t rows) {
|
||||
int32_t i;
|
||||
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
||||
if (vgroupId == pInfo->rowsOfPerVgroups[i][0]) {
|
||||
pInfo->rowsOfPerVgroups[i][1] += rows;
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][0] = vgroupId;
|
||||
pInfo->rowsOfPerVgroups[pInfo->numOfVgroups][1] += rows;
|
||||
pInfo->numOfVgroups++;
|
||||
|
||||
taosFprintfFile(g_fp, "consume id %d, add one new vogroup id: %d\n", pInfo->consumerId, vgroupId);
|
||||
if (pInfo->numOfVgroups > MAX_VGROUP_CNT) {
|
||||
taosFprintfFile(g_fp, "====consume id %d, vgroup num %d over than 32. new vgroupId: %d\n", pInfo->consumerId, pInfo->numOfVgroups, vgroupId);
|
||||
taosCloseFile(&g_fp);
|
||||
exit(-1);
|
||||
}
|
||||
}
|
||||
|
||||
int32_t saveConsumeContentToTbl(SThreadInfo* pInfo, char* buf) {
|
||||
char sqlStr[1024] = {0};
|
||||
|
||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||
assert(pConn != NULL);
|
||||
|
||||
sprintf(sqlStr, "insert into %s.content_%d values (%"PRId64", \'%s\')", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->ts++, buf);
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
pError("error in insert consume result, reason:%s\n", taos_errstr(pRes));
|
||||
taosFprintfFile(g_fp, "error in insert consume result, reason:%s\n", taos_errstr(pRes));
|
||||
taosCloseFile(&g_fp);
|
||||
taos_free_result(pRes);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
||||
char buf[1024];
|
||||
int32_t totalRows = 0;
|
||||
|
||||
|
||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
// printf("vg:%d\n", tmq_get_vgroup_id(msg));
|
||||
taosFprintfFile(g_fp, "msg index:%" PRId64 ", threadLable: %d\n", msgIndex, threadLable);
|
||||
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), tmq_get_vgroup_id(msg));
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
|
||||
taosFprintfFile(g_fp, "msg index:%" PRId64 ", consumerId: %d\n", msgIndex, pInfo->consumerId);
|
||||
//taosFprintfFile(g_fp, "topic: %s, vgroupId: %d, tableName: %s\n", tmq_get_topic_name(msg), vgroupId, tmq_get_table_name(msg));
|
||||
taosFprintfFile(g_fp, "topic: %s, vgroupId: %d\n", tmq_get_topic_name(msg), vgroupId);
|
||||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
|
@ -247,11 +304,16 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
|
|||
|
||||
if (0 != g_stConfInfo.showRowFlag) {
|
||||
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
|
||||
if (0 != g_stConfInfo.saveRowFlag) {
|
||||
saveConsumeContentToTbl(pInfo, buf);
|
||||
}
|
||||
}
|
||||
|
||||
totalRows++;
|
||||
}
|
||||
|
||||
addRowsToVgroupId(pInfo, vgroupId, totalRows);
|
||||
|
||||
return totalRows;
|
||||
}
|
||||
|
||||
|
@ -344,6 +406,32 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
|||
|
||||
taos_free_result(pRes);
|
||||
|
||||
#if 0
|
||||
// vgroups
|
||||
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||
sprintf(sqlStr, "insert into %s.vgroup_%d values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
|
||||
g_stConfInfo.cdbName,
|
||||
now,
|
||||
pInfo->consumerId,
|
||||
pInfo->consumeMsgCnt,
|
||||
pInfo->consumeRowCnt,
|
||||
pInfo->checkresult);
|
||||
|
||||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
||||
taos_free_result(pRes);
|
||||
exit(-1);
|
||||
}
|
||||
|
||||
taos_free_result(pRes);
|
||||
}
|
||||
#endif
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -356,11 +444,13 @@ void loop_consume(SThreadInfo* pInfo) {
|
|||
char tmpString[128];
|
||||
taosFprintfFile(g_fp, "%s consumer id %d start to loop pull msg\n", getCurrentTimeString(tmpString), pInfo->consumerId);
|
||||
|
||||
pInfo->ts = taosGetTimestampMs();
|
||||
|
||||
while (running) {
|
||||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, g_stConfInfo.consumeDelay * 1000);
|
||||
if (tmqMsg) {
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
totalRows += msg_process(tmqMsg, totalMsgs, pInfo->consumerId);
|
||||
totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
}
|
||||
|
||||
taos_free_result(tmqMsg);
|
||||
|
|
Loading…
Reference in New Issue