From 1228521276b2a2296d97ff6f03f1d7d72295ff41 Mon Sep 17 00:00:00 2001 From: plum-lihui Date: Mon, 16 May 2022 16:38:43 +0800 Subject: [PATCH] test: modify test case of tmq --- tests/system-test/7-tmq/basic5.py | 52 +++++++++++++++++-------------- tests/test/c/tmqSim.c | 52 ++++++++++++++++++------------- 2 files changed, 59 insertions(+), 45 deletions(-) diff --git a/tests/system-test/7-tmq/basic5.py b/tests/system-test/7-tmq/basic5.py index 65840349ba..c2fe25efc4 100644 --- a/tests/system-test/7-tmq/basic5.py +++ b/tests/system-test/7-tmq/basic5.py @@ -52,7 +52,7 @@ class TDTestCase: def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl): tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups)) tsql.execute("use %s" %dbName) - tsql.execute("create table %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) + tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName) pre_create = "create table" sql = pre_create #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) @@ -345,11 +345,11 @@ class TDTestCase: after starting consumer, create ctables ") # create and start thread parameterDict = {'cfg': '', \ - 'dbName': 'db2', \ + 'dbName': 'db3', \ 'vgroups': 1, \ 'stbName': 'stb', \ 'ctbNum': 10, \ - 'rowsPerTbl': 10000, \ + 'rowsPerTbl': 30000, \ 'batchNum': 100, \ 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 parameterDict['cfg'] = cfgPath @@ -374,22 +374,33 @@ class TDTestCase: break else: time.sleep(1) - + + tdLog.info("create stable2 for the seconde topic") + parameterDict2 = {'cfg': '', \ + 'dbName': 'db3', \ + 'vgroups': 1, \ + 'stbName': 'stb2', \ + 'ctbNum': 10, \ + 'rowsPerTbl': 30000, \ + 'batchNum': 100, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict2['cfg'] = cfgPath + tdSql.execute("create stable if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict2['dbName'], parameterDict2['stbName'])) + tdLog.info("create topics from super table") - topicFromStb = 'topic_stb_column2' - topicFromCtb = 'topic_ctb_column2' + topicFromStb = 'topic_stb_column3' + topicFromStb2 = 'topic_stb_column32' tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb, parameterDict['dbName'], parameterDict['stbName'])) - tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s_0" %(topicFromCtb, parameterDict['dbName'], parameterDict['stbName'])) + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb2, parameterDict2['dbName'], parameterDict2['stbName'])) - time.sleep(1) tdSql.query("show topics") topic1 = tdSql.getData(0 , 0) topic2 = tdSql.getData(1 , 0) tdLog.info("show topics: %s, %s"%(topic1, topic2)) - if topic1 != topicFromStb and topic1 != topicFromCtb: + if topic1 != topicFromStb and topic1 != topicFromStb2: tdLog.exit("topic error1") - if topic2 != topicFromStb and topic2 != topicFromCtb: + if topic2 != topicFromStb and topic2 != topicFromStb2: tdLog.exit("topic error2") tdLog.info("create consume info table and consume result table") @@ -397,10 +408,9 @@ class TDTestCase: tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int)"%cdbName) tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) - rowsOfNewCtb = 1000 consumerId = 0 - expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + rowsOfNewCtb - topicList = topicFromStb + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + parameterDict2["rowsPerTbl"] * parameterDict2["ctbNum"] + topicList = topicFromStb + ',' + topicFromStb2 ifcheckdata = 0 keyList = 'group.id:cgrp1,\ enable.auto.commit:false,\ @@ -432,17 +442,13 @@ class TDTestCase: tdLog.info(shellCmd) os.system(shellCmd) - # create new child table and insert data - newCtbName = 'newctb' - tdSql.query("create table %s.%s using %s.%s tags(9999)"%(parameterDict["dbName"], newCtbName, parameterDict["dbName"], parameterDict["stbName"])) - startTs = parameterDict["startTs"] - for j in range(rowsOfNewCtb): - sql = "insert into %s.%s values (%d, %d, 'tmqrow_%d') "%(parameterDict["dbName"], newCtbName, startTs + j, j, j) - tdSql.execute(sql) - tdLog.debug("insert data into new child table ............ [OK]") + # start the second thread to create new child table and insert data + prepareEnvThread2 = threading.Thread(target=self.prepareEnv, kwargs=parameterDict2) + prepareEnvThread2.start() # wait for data ready prepareEnvThread.join() + prepareEnvThread2.join() tdLog.info("insert process end, and start to check consume result") while 1: @@ -457,7 +463,7 @@ class TDTestCase: tdSql.checkData(0 , 3, expectrowcnt) tdSql.query("drop topic %s"%topicFromStb) - tdSql.query("drop topic %s"%topicFromCtb) + tdSql.query("drop topic %s"%topicFromStb2) tdLog.printNoPrefix("======== test case 3 end ...... ") @@ -474,7 +480,7 @@ class TDTestCase: self.tmqCase1(cfgPath, buildPath) self.tmqCase2(cfgPath, buildPath) - #self.tmqCase3(cfgPath, buildPath) + self.tmqCase3(cfgPath, buildPath) def stop(self): tdSql.close() diff --git a/tests/test/c/tmqSim.c b/tests/test/c/tmqSim.c index 1228d6174c..b3dba695a7 100644 --- a/tests/test/c/tmqSim.c +++ b/tests/test/c/tmqSim.c @@ -37,9 +37,10 @@ typedef struct { TdThread thread; int32_t consumerId; - int32_t autoCommitIntervalMs; // 1000 ms - char autoCommit[8]; // true, false - char autoOffsetRest[16]; // none, earliest, latest + int32_t ifManualCommit; + //int32_t autoCommitIntervalMs; // 1000 ms + //char autoCommit[8]; // true, false + //char autoOffsetRest[16]; // none, earliest, latest int32_t ifCheckData; int64_t expectMsgCnt; @@ -136,9 +137,9 @@ void saveConfigToLogFile() { for (int32_t i = 0; i < g_stConfInfo.numOfThread; i++) { taosFprintfFile(g_fp, "# consumer %d info:\n", g_stConfInfo.stThreads[i].consumerId); - taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); - taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); - taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); + //taosFprintfFile(g_fp, " auto commit: %s\n", g_stConfInfo.stThreads[i].autoCommit); + //taosFprintfFile(g_fp, " auto commit interval ms: %d\n", g_stConfInfo.stThreads[i].autoCommitIntervalMs); + //taosFprintfFile(g_fp, " auto offset rest: %s\n", g_stConfInfo.stThreads[i].autoOffsetRest); taosFprintfFile(g_fp, " Topics: "); for (int j = 0; j < g_stConfInfo.stThreads[i].numOfTopic; j++) { taosFprintfFile(g_fp, "%s, ", g_stConfInfo.stThreads[i].topics[j]); @@ -232,13 +233,18 @@ static int32_t msg_process(TAOS_RES* msg, int64_t msgIndex, int32_t threadLable) while (1) { TAOS_ROW row = taos_fetch_row(msg); - if (row == NULL) break; - if (0 != g_stConfInfo.showRowFlag) { - TAOS_FIELD* fields = taos_fetch_fields(msg); - int32_t numOfFields = taos_field_count(msg); - taos_print_row(buf, row, fields, numOfFields); + + if (row == NULL) break; + + TAOS_FIELD* fields = taos_fetch_fields(msg); + int32_t numOfFields = taos_field_count(msg); + + taos_print_row(buf, row, fields, numOfFields); + + if (0 != g_stConfInfo.showRowFlag) { taosFprintfFile(g_fp, "rows[%d]: %s\n", totalRows, buf); } + totalRows++; } @@ -316,6 +322,8 @@ int32_t saveConsumeResult(SThreadInfo* pInfo) { sprintf(sqlStr, "insert into %s.consumeresult values (now, %d, %" PRId64 ", %" PRId64 ", %d)", g_stConfInfo.cdbName, pInfo->consumerId, pInfo->consumeMsgCnt, pInfo->consumeRowCnt, pInfo->checkresult); + taosFprintfFile(g_fp, "== save result sql: %s \n", sqlStr); + TAOS_RES* pRes = taos_query(pConn, sqlStr); if (taos_errno(pRes) != 0) { pError("error in save consumeinfo, reason:%s\n", taos_errstr(pRes)); @@ -384,8 +392,12 @@ void* consumeThreadFunc(void* param) { loop_consume(pInfo); - tmq_commit(pInfo->tmq, NULL, 0); - + if (pInfo->ifManualCommit) { + taosFprintfFile(g_fp, "tmq_commit() manual commit when consume end.\n"); + pPrint("tmq_commit() manual commit when consume end.\n"); + tmq_commit(pInfo->tmq, NULL, 0); + } + err = tmq_unsubscribe(pInfo->tmq); if (err) { pError("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err)); @@ -470,9 +482,9 @@ int32_t getConsumeInfo() { int32_t* lengths = taos_fetch_lengths(pRes); // set default value - g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; - memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); - memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); + //g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = 5000; + //memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, "true", strlen("true")); + //memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, "earlieast", strlen("earlieast")); for (int i = 0; i < num_fields; ++i) { if (row[i] == NULL || 0 == i) { @@ -489,12 +501,8 @@ int32_t getConsumeInfo() { g_stConfInfo.stThreads[numOfThread].expectMsgCnt = *((int64_t*)row[i]); } else if ((5 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { g_stConfInfo.stThreads[numOfThread].ifCheckData = *((int32_t*)row[i]); - } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { - memcpy(g_stConfInfo.stThreads[numOfThread].autoCommit, row[i], lengths[i]); - } else if ((7 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { - g_stConfInfo.stThreads[numOfThread].autoCommitIntervalMs = *((int32_t*)row[i]); - } else if ((8 == i) && (fields[i].type == TSDB_DATA_TYPE_BINARY)) { - memcpy(g_stConfInfo.stThreads[numOfThread].autoOffsetRest, row[i], lengths[i]); + } else if ((6 == i) && (fields[i].type == TSDB_DATA_TYPE_INT)) { + g_stConfInfo.stThreads[numOfThread].ifManualCommit = *((int32_t*)row[i]); } } numOfThread++;