Merge pull request #14036 from taosdata/test3.0/lihui
test: add notify between main script and comsume processor
This commit is contained in:
commit
3b9de239ce
|
@ -54,9 +54,11 @@ class TDTestCase:
|
||||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||||
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||||
|
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))
|
||||||
|
|
||||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||||
|
tdSql.query("create table %s.notifyinfo (ts timestamp, cmdid int, consumerid int)"%cdbName)
|
||||||
|
|
||||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||||
sql = "insert into %s.consumeinfo values "%cdbName
|
sql = "insert into %s.consumeinfo values "%cdbName
|
||||||
|
@ -64,6 +66,27 @@ class TDTestCase:
|
||||||
tdLog.info("consume info sql: %s"%sql)
|
tdLog.info("consume info sql: %s"%sql)
|
||||||
tdSql.query(sql)
|
tdSql.query(sql)
|
||||||
|
|
||||||
|
def getStartConsumeNotifyFromTmqsim(self,cdbName='cdb'):
|
||||||
|
while 1:
|
||||||
|
tdSql.query("select * from %s.notifyinfo"%cdbName)
|
||||||
|
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||||
|
if (tdSql.getRows() == 1) and (tdSql.getData(0, 1) == 0):
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
time.sleep(0.1)
|
||||||
|
return
|
||||||
|
|
||||||
|
def getStartCommitNotifyFromTmqsim(self,cdbName='cdb'):
|
||||||
|
while 1:
|
||||||
|
tdSql.query("select * from %s.notifyinfo"%cdbName)
|
||||||
|
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||||
|
if tdSql.getRows() == 2 :
|
||||||
|
print(tdSql.getData(0, 1), tdSql.getData(1, 1))
|
||||||
|
if tdSql.getData(1, 1) == 1:
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
return
|
||||||
|
|
||||||
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||||
resultList=[]
|
resultList=[]
|
||||||
while 1:
|
while 1:
|
||||||
|
@ -72,7 +95,7 @@ class TDTestCase:
|
||||||
if tdSql.getRows() == expectRows:
|
if tdSql.getRows() == expectRows:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
time.sleep(5)
|
time.sleep(1)
|
||||||
|
|
||||||
for i in range(expectRows):
|
for i in range(expectRows):
|
||||||
tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
tdLog.info ("ts: %s, consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 0), tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||||
|
@ -207,7 +230,9 @@ class TDTestCase:
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
time.sleep(2)
|
tdLog.info("wait the notify info of start consume")
|
||||||
|
self.getStartConsumeNotifyFromTmqsim()
|
||||||
|
|
||||||
tdLog.info("pkill consume processor")
|
tdLog.info("pkill consume processor")
|
||||||
if (platform.system().lower() == 'windows'):
|
if (platform.system().lower() == 'windows'):
|
||||||
os.system("TASKKILL /F /IM tmq_sim.exe")
|
os.system("TASKKILL /F /IM tmq_sim.exe")
|
||||||
|
@ -282,14 +307,17 @@ class TDTestCase:
|
||||||
showRow = 1
|
showRow = 1
|
||||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
time.sleep(6)
|
# time.sleep(6)
|
||||||
|
tdLog.info("start to wait commit notify")
|
||||||
|
self.getStartCommitNotifyFromTmqsim()
|
||||||
|
|
||||||
tdLog.info("pkill consume processor")
|
tdLog.info("pkill consume processor")
|
||||||
if (platform.system().lower() == 'windows'):
|
if (platform.system().lower() == 'windows'):
|
||||||
os.system("TASKKILL /F /IM tmq_sim.exe")
|
os.system("TASKKILL /F /IM tmq_sim.exe")
|
||||||
else:
|
else:
|
||||||
os.system('pkill tmq_sim')
|
os.system('pkill tmq_sim')
|
||||||
expectRows = 0
|
# expectRows = 0
|
||||||
resultList = self.selectConsumeResult(expectRows)
|
# resultList = self.selectConsumeResult(expectRows)
|
||||||
|
|
||||||
# wait for data ready
|
# wait for data ready
|
||||||
prepareEnvThread.join()
|
prepareEnvThread.join()
|
||||||
|
|
|
@ -34,6 +34,12 @@
|
||||||
#define MAX_CONSUMER_THREAD_CNT (16)
|
#define MAX_CONSUMER_THREAD_CNT (16)
|
||||||
#define MAX_VGROUP_CNT (32)
|
#define MAX_VGROUP_CNT (32)
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
NOTIFY_CMD_START_CONSUM,
|
||||||
|
NOTIFY_CMD_START_COMMIT,
|
||||||
|
NOTIFY_CMD_ID_BUTT
|
||||||
|
}NOTIFY_CMD_ID;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TdThread thread;
|
TdThread thread;
|
||||||
int32_t consumerId;
|
int32_t consumerId;
|
||||||
|
@ -67,6 +73,8 @@ typedef struct {
|
||||||
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
int32_t rowsOfPerVgroups[MAX_VGROUP_CNT][2]; // [i][0]: vgroup id, [i][1]: rows of consume
|
||||||
int64_t ts;
|
int64_t ts;
|
||||||
|
|
||||||
|
TAOS* taos;
|
||||||
|
|
||||||
} SThreadInfo;
|
} SThreadInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
@ -339,8 +347,37 @@ int queryDB(TAOS* taos, char* command) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void appNothing(void* param, TAOS_RES* res, int32_t numOfRows) {
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
||||||
|
char sqlStr[1024] = {0};
|
||||||
|
|
||||||
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
|
sprintf(sqlStr, "insert into %s.notifyinfo values (%"PRId64", %d, %d)",
|
||||||
|
g_stConfInfo.cdbName,
|
||||||
|
now,
|
||||||
|
cmdId,
|
||||||
|
pInfo->consumerId);
|
||||||
|
|
||||||
|
taos_query_a(pInfo->taos, sqlStr, appNothing, NULL);
|
||||||
|
|
||||||
|
taosFprintfFile(g_fp, "notifyMainScript success, sql: %s\n", sqlStr);
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static int32_t g_once_commit_flag = 0;
|
||||||
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||||
pError("tmq_commit_cb_print() commit %d\n", code);
|
pError("tmq_commit_cb_print() commit %d\n", code);
|
||||||
|
|
||||||
|
if (0 == g_once_commit_flag) {
|
||||||
|
g_once_commit_flag = 1;
|
||||||
|
notifyMainScript((SThreadInfo*)param, (int32_t)NOTIFY_CMD_START_COMMIT);
|
||||||
|
}
|
||||||
|
taosFprintfFile(g_fp, "tmq_commit_cb_print() be called\n");
|
||||||
}
|
}
|
||||||
|
|
||||||
void build_consumer(SThreadInfo* pInfo) {
|
void build_consumer(SThreadInfo* pInfo) {
|
||||||
|
@ -353,7 +390,7 @@ void build_consumer(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
|
// tmq_conf_set(conf, "td.connect.db", g_stConfInfo.dbName);
|
||||||
|
|
||||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, pInfo);
|
||||||
|
|
||||||
// tmq_conf_set(conf, "group.id", "cgrp1");
|
// tmq_conf_set(conf, "group.id", "cgrp1");
|
||||||
for (int32_t i = 0; i < pInfo->numOfKey; i++) {
|
for (int32_t i = 0; i < pInfo->numOfKey; i++) {
|
||||||
|
@ -392,9 +429,6 @@ void build_topic_list(SThreadInfo* pInfo) {
|
||||||
int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
char sqlStr[1024] = {0};
|
char sqlStr[1024] = {0};
|
||||||
|
|
||||||
TAOS* pConn = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
|
||||||
assert(pConn != NULL);
|
|
||||||
|
|
||||||
int64_t now = taosGetTimestampMs();
|
int64_t now = taosGetTimestampMs();
|
||||||
|
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
||||||
|
@ -404,7 +438,7 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
|
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId, sqlStr);
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
TAOS_RES* pRes = taos_query(pInfo->taos, sqlStr);
|
||||||
if (taos_errno(pRes) != 0) {
|
if (taos_errno(pRes) != 0) {
|
||||||
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
@ -413,38 +447,14 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
taos_free_result(pRes);
|
taos_free_result(pRes);
|
||||||
|
|
||||||
#if 0
|
|
||||||
// vgroups
|
|
||||||
for (i = 0; i < pInfo->numOfVgroups; i++) {
|
|
||||||
// schema: ts timestamp, consumerid int, consummsgcnt bigint, checkresult int
|
|
||||||
sprintf(sqlStr, "insert into %s.vgroup_%d values (%"PRId64", %d, %" PRId64 ", %" PRId64 ", %d)",
|
|
||||||
g_stConfInfo.cdbName,
|
|
||||||
now,
|
|
||||||
pInfo->consumerId,
|
|
||||||
pInfo->consumeMsgCnt,
|
|
||||||
pInfo->consumeRowCnt,
|
|
||||||
pInfo->checkresult);
|
|
||||||
|
|
||||||
char tmpString[128];
|
|
||||||
taosFprintfFile(g_fp, "%s, consume id %d result: %s\n", getCurrentTimeString(tmpString), pInfo->consumerId ,sqlStr);
|
|
||||||
|
|
||||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
|
||||||
if (taos_errno(pRes) != 0) {
|
|
||||||
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
|
||||||
taos_free_result(pRes);
|
|
||||||
exit(-1);
|
|
||||||
}
|
|
||||||
|
|
||||||
taos_free_result(pRes);
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void loop_consume(SThreadInfo* pInfo) {
|
void loop_consume(SThreadInfo* pInfo) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
|
int32_t once_flag = 0;
|
||||||
|
|
||||||
int64_t totalMsgs = 0;
|
int64_t totalMsgs = 0;
|
||||||
int64_t totalRows = 0;
|
int64_t totalRows = 0;
|
||||||
|
|
||||||
|
@ -465,6 +475,11 @@ void loop_consume(SThreadInfo* pInfo) {
|
||||||
|
|
||||||
totalMsgs++;
|
totalMsgs++;
|
||||||
|
|
||||||
|
if (0 == once_flag) {
|
||||||
|
once_flag = 1;
|
||||||
|
notifyMainScript(pInfo, NOTIFY_CMD_START_CONSUM);
|
||||||
|
}
|
||||||
|
|
||||||
if (totalRows >= pInfo->expectMsgCnt) {
|
if (totalRows >= pInfo->expectMsgCnt) {
|
||||||
char tmpString[128];
|
char tmpString[128];
|
||||||
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
taosFprintfFile(g_fp, "%s over than expect rows, so break consume\n", getCurrentTimeString(tmpString));
|
||||||
|
@ -489,6 +504,12 @@ void* consumeThreadFunc(void* param) {
|
||||||
|
|
||||||
SThreadInfo* pInfo = (SThreadInfo*)param;
|
SThreadInfo* pInfo = (SThreadInfo*)param;
|
||||||
|
|
||||||
|
pInfo->taos = taos_connect(NULL, "root", "taosdata", NULL, 0);
|
||||||
|
if (pInfo->taos == NULL) {
|
||||||
|
taosFprintfFile(g_fp, "taos_connect() fail, can not notify and save consume result to main scripte\n");
|
||||||
|
exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
build_consumer(pInfo);
|
build_consumer(pInfo);
|
||||||
build_topic_list(pInfo);
|
build_topic_list(pInfo);
|
||||||
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
if ((NULL == pInfo->tmq) || (NULL == pInfo->topicList)) {
|
||||||
|
@ -508,7 +529,6 @@ void* consumeThreadFunc(void* param) {
|
||||||
loop_consume(pInfo);
|
loop_consume(pInfo);
|
||||||
|
|
||||||
if (pInfo->ifManualCommit) {
|
if (pInfo->ifManualCommit) {
|
||||||
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
|
|
||||||
pPrint("tmq_commit() manual commit when consume end.\n");
|
pPrint("tmq_commit() manual commit when consume end.\n");
|
||||||
/*tmq_commit(pInfo->tmq, NULL, 0);*/
|
/*tmq_commit(pInfo->tmq, NULL, 0);*/
|
||||||
tmq_commit_sync(pInfo->tmq, NULL);
|
tmq_commit_sync(pInfo->tmq, NULL);
|
||||||
|
@ -539,6 +559,9 @@ void* consumeThreadFunc(void* param) {
|
||||||
taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
|
taosFprintfFile(g_fp, "vgroups: %04d, rows: %d\n", pInfo->rowsOfPerVgroups[i][0], pInfo->rowsOfPerVgroups[i][1]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
taos_close(pInfo->taos);
|
||||||
|
pInfo->taos = NULL;
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue