test: modify test case of tmq
This commit is contained in:
parent
3398bf44e9
commit
1228521276
|
@ -52,7 +52,7 @@ class TDTestCase:
|
|||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||
tsql.execute("use %s" %dbName)
|
||||
tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
|
@ -345,11 +345,11 @@ class TDTestCase:
|
|||
after starting consumer, create ctables ")
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'dbName': 'db2', \
|
||||
'dbName': 'db3', \
|
||||
'vgroups': 1, \
|
||||
'stbName': 'stb', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 10000, \
|
||||
'rowsPerTbl': 30000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
@ -374,22 +374,33 @@ class TDTestCase:
|
|||
break
|
||||
else:
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
tdLog.info("create stable2 for the seconde topic")
|
||||
parameterDict2 = {'cfg': '', \
|
||||
'dbName': 'db3', \
|
||||
'vgroups': 1, \
|
||||
'stbName': 'stb2', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 30000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict2['cfg'] = cfgPath
|
||||
tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName']))
|
||||
|
||||
tdLog.info("create topics from super table")
|
||||
topicFromStb = 'topic_stb_column2'
|
||||
topicFromCtb = 'topic_ctb_column2'
|
||||
topicFromStb = 'topic_stb_column3'
|
||||
topicFromStb2 = 'topic_stb_column32'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName']))
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName']))
|
||||
|
||||
time.sleep(1)
|
||||
tdSql.query("show topics")
|
||||
topic1 = tdSql.getData(0 , 0)
|
||||
topic2 = tdSql.getData(1 , 0)
|
||||
tdLog.info("show topics: %s, %s"%(topic1, topic2))
|
||||
if topic1 != topicFromStb and topic1 != topicFromCtb:
|
||||
if topic1 != topicFromStb and topic1 != topicFromStb2:
|
||||
tdLog.exit("topic error1")
|
||||
if topic2 != topicFromStb and topic2 != topicFromCtb:
|
||||
if topic2 != topicFromStb and topic2 != topicFromStb2:
|
||||
tdLog.exit("topic error2")
|
||||
|
||||
tdLog.info("create consume info table and consume result table")
|
||||
|
@ -397,10 +408,9 @@ class TDTestCase:
|
|||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
rowsOfNewCtb = 1000
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb
|
||||
topicList = topicFromStb
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"]
|
||||
topicList = topicFromStb + ',' + topicFromStb2
|
||||
ifcheckdata = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
|
@ -432,17 +442,13 @@ class TDTestCase:
|
|||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
# create new child table and insert data
|
||||
newCtbName = 'newctb'
|
||||
tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"]))
|
||||
startTs = parameterDict["startTs"]
|
||||
for j in range(rowsOfNewCtb):
|
||||
sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j)
|
||||
tdSql.execute(sql)
|
||||
tdLog.debug("insert data into new child table ............ [OK]")
|
||||
# start the second thread to create new child table and insert data
|
||||
prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2)
|
||||
prepareEnvThread2.start()
|
||||
|
||||
# wait for data ready
|
||||
prepareEnvThread.join()
|
||||
prepareEnvThread2.join()
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
while 1:
|
||||
|
@ -457,7 +463,7 @@ class TDTestCase:
|
|||
tdSql.checkData(0 , 3, expectrowcnt)
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb)
|
||||
tdSql.query("drop topic %s"%topicFromCtb)
|
||||
tdSql.query("drop topic %s"%topicFromStb2)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||
|
||||
|
@ -474,7 +480,7 @@ class TDTestCase:
|
|||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
self.tmqCase2(cfgPath, buildPath)
|
||||
#self.tmqCase3(cfgPath, buildPath)
|
||||
self.tmqCase3(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -37,9 +37,10 @@ typedef struct {
|
|||
TdThread thread;
|
||||
int32_t consumerId;
|
||||
|
||||
int32_t autoCommitIntervalMs; // 1000 ms
|
||||
char autoCommit[8]; // true, false
|
||||
char autoOffsetRest[16]; // none, earliest, latest
|
||||
int32_t ifManualCommit;
|
||||
//int32_t autoCommitIntervalMs; // 1000 ms
|
||||
//char autoCommit[8]; // true, false
|
||||
//char autoOffsetRest[16]; // none, earliest, latest
|
||||
|
||||
int32_t ifCheckData;
|
||||
int64_t expectMsgCnt;
|
||||
|
@ -136,9 +137,9 @@ void saveConfigToLogFile() {
|
|||
|
||||
for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) {
|
||||
taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId);
|
||||
taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
|
||||
taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
|
||||
taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
|
||||
//taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit);
|
||||
//taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs);
|
||||
//taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest);
|
||||
taosFprintfFile(g_fp, " Topics: ");
|
||||
for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) {
|
||||
taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]);
|
||||
|
@ -232,13 +233,18 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable)
|
|||
|
||||
while (1) {
|
||||
TAOS_ROW row = taos_fetch_row(msg);
|
||||
if (row == NULL) break;
|
||||
if (0 != g_stConfInfo.showRowFlag) {
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
if (row == NULL) break;
|
||||
|
||||
TAOS_FIELD* fields = taos_fetch_fields(msg);
|
||||
int32_t numOfFields = taos_field_count(msg);
|
||||
|
||||
taos_print_row(buf, row, fields, numOfFields);
|
||||
|
||||
if (0 != g_stConfInfo.showRowFlag) {
|
||||
taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf);
|
||||
}
|
||||
|
||||
totalRows++;
|
||||
}
|
||||
|
||||
|
@ -316,6 +322,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) {
|
|||
sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName,
|
||||
pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult);
|
||||
|
||||
taosFprintfFile(g_fp, "== save result sql: %s \n", sqlStr);
|
||||
|
||||
TAOS_RES* pRes = taos_query(pConn, sqlStr);
|
||||
if (taos_errno(pRes) != 0) {
|
||||
pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes));
|
||||
|
@ -384,8 +392,12 @@ void* consumeThreadFunc(void* param) {
|
|||
|
||||
loop_consume(pInfo);
|
||||
|
||||
tmq_commit(pInfo->tmq, NULL, 0);
|
||||
|
||||
if (pInfo->ifManualCommit) {
|
||||
taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n");
|
||||
pPrint("tmq_commit() manual commit when consume end.\n");
|
||||
tmq_commit(pInfo->tmq, NULL, 0);
|
||||
}
|
||||
|
||||
err = tmq_unsubscribe(pInfo->tmq);
|
||||
if (err) {
|
||||
pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
|
||||
|
@ -470,9 +482,9 @@ int32_t getConsumeInfo() {
|
|||
int32_t* lengths = taos_fetch_lengths(pRes);
|
||||
|
||||
// set default value
|
||||
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
|
||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
|
||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
|
||||
//g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000;
|
||||
//memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true"));
|
||||
//memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast"));
|
||||
|
||||
for (int i = 0; i < num_fields; ++i) {
|
||||
if (row[i] == NULL || 0 == i) {
|
||||
|
@ -489,12 +501,8 @@ int32_t getConsumeInfo() {
|
|||
g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]);
|
||||
} else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
||||
g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]);
|
||||
} else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
|
||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]);
|
||||
} else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
||||
g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]);
|
||||
} else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) {
|
||||
memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]);
|
||||
} else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) {
|
||||
g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]);
|
||||
}
|
||||
}
|
||||
numOfThread++;
|
||||
|
|
Loading…
Reference in New Issue