From 6244712887e79015240bef5cf21f3398ae88d505 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 23 Dec 2024 18:12:47 +0800 Subject: [PATCH] fix:[TD-33272]add test case --- source/dnode/vnode/src/tq/tqOffset.c | 3 +- source/dnode/vnode/test/tqTest.cpp | 4 +- .../7-tmq/tmqVnodeSplit-ntb-select.py | 192 ++++++++++++++++++ 3 files changed, 195 insertions(+), 4 deletions(-) create mode 100644 tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 57a901a2e1..a131f30a04 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -26,8 +26,7 @@ int32_t tqBuildFName(char** data, const char* path, char* name) { int32_t len = strlen(path) + strlen(name) + 2; fname = taosMemoryCalloc(1, len); TSDB_CHECK_NULL(fname, code, lino, END, terrno); - code = tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name); - TSDB_CHECK_CODE(code, lino, END); + (void)tsnprintf(fname, len, "%s%s%s", path, TD_DIRSEP, name); *data = fname; fname = NULL; diff --git a/source/dnode/vnode/test/tqTest.cpp b/source/dnode/vnode/test/tqTest.cpp index 1b5fe4fdcd..d757aa6d43 100644 --- a/source/dnode/vnode/test/tqTest.cpp +++ b/source/dnode/vnode/test/tqTest.cpp @@ -37,11 +37,11 @@ int main(int argc, char **argv) { return RUN_ALL_TESTS(); } -STqOffset offset = {.subKey = "testtest", .val = {.type = TMQ_OFFSET__LOG, .version = 8923}}; - void tqWriteOffset() { TdFilePtr pFile = taosOpenFile(TQ_OFFSET_NAME, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); + STqOffset offset = {.val = {.type = TMQ_OFFSET__LOG, .version = 8923}}; + strcpy(offset.subKey, "testtest"); int32_t bodyLen; int32_t code; tEncodeSize(tEncodeSTqOffset, &offset, bodyLen, code); diff --git a/tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py b/tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py new file mode 100644 index 0000000000..8f8eb7d4ea --- /dev/null +++ b/tests/system-test/7-tmq/tmqVnodeSplit-ntb-select.py @@ -0,0 +1,192 @@ + +import taos +import sys +import time +import socket +import os +import threading +import math + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from util.cluster import * +sys.path.append("./7-tmq") +from tmqCommon import * + +from util.cluster import * +sys.path.append("./6-cluster") +from clusterCommonCreate import * +from clusterCommonCheck import clusterComCheck + + + +class TDTestCase: + def __init__(self): + self.vgroups = 1 + self.ctbNum = 10 + self.rowsPerTbl = 1000 + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) + + def getDataPath(self): + selfPath = tdCom.getBuildPath() + + return selfPath + '/../sim/dnode%d/data/vnode/vnode%d/wal/*'; + + def prepareTestEnv(self): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 1000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 120, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + tdCom.drop_all_db() + tmqCom.initConsumerTable() + tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar) + tdLog.info("create stb") + tdSql.query("create table dbt.t(ts timestamp, v int)") + tdSql.query("insert into dbt.t values('2022-01-01 00:00:00.000', 0)") + tdSql.query("insert into dbt.t values('2022-01-01 00:00:02.000', 0)") + tdSql.query("insert into dbt.t values('2022-01-01 00:00:03.000', 0)") + return + + def restartAndRemoveWal(self, deleteWal): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + tdLog.debug("dataPath:%s"%dataPath) + if deleteWal: + if os.system('rm -rf ' + dataPath) != 0: + tdLog.exit("rm error") + + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def splitVgroups(self): + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + vnodeId = result[1] + tdLog.debug("vnode is %d"%(vnodeId)) + break + splitSql = "split vgroup %d" %(vnodeId) + tdLog.debug("splitSql:%s"%(splitSql)) + tdSql.query(splitSql) + tdLog.debug("splitSql ok") + + def tmqCase1(self, deleteWal=False): + tdLog.printNoPrefix("======== test case 1: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stb', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb1', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 1000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 2, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic1'] + # expectRowsList = [] + tmqCom.initConsumerTable() + + tdLog.info("create topics from ntb with filter") + queryString = "select * from %s.t"%(paraDict['dbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + + tmqCom.getStartConsumeNotifyFromTmqsim() + tmqCom.getStartCommitNotifyFromTmqsim() + + #restart dnode & remove wal + self.restartAndRemoveWal(deleteWal) + + # split vgroup + self.splitVgroups() + + clusterComCheck.check_vgroups_status(vgroup_numbers=2,db_replica=self.replicaVar,db_name="dbt",count_number=240) + + time.sleep(3) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + tdLog.printNoPrefix("======== test case 1 end ...... ") + + def run(self): + self.prepareTestEnv() + self.tmqCase1(True) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())