add test case for tmq in vnode transform & fix core in race condition for pTq->pExecStore

This commit is contained in:
wangmm0220 2023-09-18 11:27:03 +08:00
parent 908cf88414
commit 7869b693f2
11 changed files with 503 additions and 7 deletions

View File

@ -1415,6 +1415,8 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
STqOffsetVal offsetNew = {0};
offsetNew.type = tmq->resetOffsetCfg;
tscInfo("consumer:0x%" PRIx64 ", update topic:%s, new numOfVgs:%d, num:%d, port:%d", tmq->consumerId, pTopic->topicName, vgNumGet, pVgEp->epSet.numOfEps,pVgEp->epSet.eps[pVgEp->epSet.inUse].port);
SMqClientVg clientVg = {
.pollCnt = 0,
.vgId = pVgEp->vgId,

View File

@ -657,7 +657,7 @@ int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
}
// check topic existence
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_TOPIC, pMsg, "subscribe");
pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, pMsg, "subscribe");
if (pTrans == NULL) {
goto _over;
}

View File

@ -670,7 +670,14 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
STqHandle* pHandle = NULL;
while (1) {
pHandle = taosHashGet(pTq->pHandle, req.subKey, strlen(req.subKey));
if (pHandle || tqMetaGetHandle(pTq, req.subKey) < 0) {
if (pHandle) {
break;
}
taosRLockLatch(&pTq->lock);
ret = tqMetaGetHandle(pTq, req.subKey);
taosRUnLockLatch(&pTq->lock);
if (ret < 0) {
break;
}
}
@ -690,7 +697,9 @@ int32_t tqProcessSubscribeReq(STQ* pTq, int64_t sversion, char* msg, int32_t msg
tqDestroyTqHandle(&handle);
goto end;
}
taosWLockLatch(&pTq->lock);
ret = tqMetaSaveHandle(pTq, req.subKey, &handle);
taosWUnLockLatch(&pTq->lock);
} else {
while(1){
taosWLockLatch(&pTq->lock);

View File

@ -198,7 +198,9 @@ int32_t tqSnapWrite(STqSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr));
code = tDecodeSTqHandle(pDecoder, &handle);
if (code) goto _err;
taosWLockLatch(&pTq->lock);
code = tqMetaSaveHandle(pTq, handle.subKey, &handle);
taosWUnLockLatch(&pTq->lock);
if (code < 0) goto _err;
tDecoderClear(pDecoder);

View File

@ -1087,6 +1087,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
int32_t vgId = TD_VID(pTq->pVnode);
// update the table list for each consumer handle
taosWLockLatch(&pTq->lock);
while (1) {
pIter = taosHashIterate(pTq->pHandle, pIter);
if (pIter == NULL) {
@ -1116,6 +1117,8 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
tqError("qGetTableList in tqUpdateTbUidList error:%d handle %s consumer:0x%" PRIx64, ret, pTqHandle->subKey, pTqHandle->consumerId);
taosArrayDestroy(list);
taosHashCancelIterate(pTq->pHandle, pIter);
taosWUnLockLatch(&pTq->lock);
return ret;
}
tqReaderSetTbUidList(pTqHandle->execHandle.pTqReader, list, NULL);
@ -1125,6 +1128,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
}
}
}
taosWUnLockLatch(&pTq->lock);
// update the table list handle for each stream scanner/wal reader
taosWLockLatch(&pTq->pStreamMeta->lock);

View File

@ -417,6 +417,7 @@ void transThreadOnce();
void transInit();
void transCleanup();
void transPrintEpSet(SEpSet* pEpSet);
void transFreeMsg(void* msg);
int32_t transCompressMsg(char* msg, int32_t len);

View File

@ -2221,7 +2221,9 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
}
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
tDebug("epset not equal, retry new epset1");
transPrintEpSet(&pCtx->epSet);
transPrintEpSet(&epSet);
epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {
@ -2246,7 +2248,9 @@ bool cliResetEpset(STransConnCtx* pCtx, STransMsg* pResp, bool hasEpSet) {
}
} else {
if (!transEpSetIsEqual(&pCtx->epSet, &epSet)) {
tDebug("epset not equal, retry new epset");
tDebug("epset not equal, retry new epset2");
transPrintEpSet(&pCtx->epSet);
transPrintEpSet(&epSet);
epsetAssign(&pCtx->epSet, &epSet);
noDelay = false;
} else {

View File

@ -54,7 +54,7 @@ class ConfigureyCluster:
# configure dnoe of independent mnodes
if num <= self.mnodeNums and self.mnodeNums != 0 and independentMnode == True :
tdLog.info(f"set mnode:{num} supportVnodes 0")
dnode.addExtraCfg("supportVnodes", 0)
# dnode.addExtraCfg("supportVnodes", 0)
# print(dnode)
self.dnodes.append(dnode)
return self.dnodes

View File

@ -45,9 +45,9 @@ class TMQCom:
tdSql.init(conn.cursor())
# tdSql.init(conn.cursor(), logSql) # output sql.txt file
def initConsumerTable(self,cdbName='cdb'):
def initConsumerTable(self,cdbName='cdb', replicaVar=1):
tdLog.info("create consume database, and consume info table, and consume result table")
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
tdSql.query("create database if not exists %s vgroups 1 replica %d"%(cdbName,replicaVar))
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
tdSql.query("drop table if exists %s.notifyinfo "%(cdbName))

View File

@ -0,0 +1,263 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctbn',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 20,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
topicNameList = ['topic1']
# expectRowsList = []
tmqCom.initConsumerTable("cdb", self.replicaVar)
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim()
tdSql.query("select * from information_schema.ins_vnodes")
# tdLog.debug(tdSql.queryResult)
tdDnodes = cluster.dnodes
for result in tdSql.queryResult:
if result[2] == 'dbt' and result[3] == 'leader':
tdLog.debug("leader is %d"%(result[0] - 1))
tdDnodes[result[0] - 1].stoptaosd()
break
pInsertThread.join()
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectRowsList[0], resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
# def tmqCase2(self):
# tdLog.printNoPrefix("======== test case 2: ")
# paraDict = {'dbName': 'dbt',
# 'dropFlag': 1,
# 'event': '',
# 'vgroups': 1,
# 'stbName': 'stb',
# 'colPrefix': 'c',
# 'tagPrefix': 't',
# 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
# 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
# 'ctbPrefix': 'ctb',
# 'ctbStartIdx': 0,
# 'ctbNum': 10,
# 'rowsPerTbl': 10000,
# 'batchNum': 10,
# 'startTs': 1640966400000, # 2022-01-01 00:00:00.000
# 'pollDelay': 3,
# 'showMsg': 1,
# 'showRow': 1,
# 'snapshot': 1}
#
# paraDict['vgroups'] = self.vgroups
# paraDict['ctbNum'] = self.ctbNum
# paraDict['rowsPerTbl'] = self.rowsPerTbl
#
# topicNameList = ['topic1']
# expectRowsList = []
# tmqCom.initConsumerTable()
#
# tdLog.info("create topics from stb with filter")
# queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# # sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
# sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
# tdLog.info("create topic sql: %s"%sqlString)
# tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# totalRowsInserted = expectRowsList[0]
#
# # init consume info, and start tmq_sim, then check consume result
# tdLog.info("insert consume info to consume processor")
# consumerId = 1
# expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] / 3)
# topicList = topicNameList[0]
# ifcheckdata = 1
# ifManualCommit = 1
# keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
#
# tdLog.info("start consume processor 0")
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# tdLog.info("wait the consume result")
#
# expectRows = 1
# resultList = tmqCom.selectConsumeResult(expectRows)
#
# if not (expectrowcnt <= resultList[0] and totalRowsInserted >= resultList[0]):
# tdLog.info("act consume rows: %d, expect consume rows between %d and %d"%(resultList[0], expectrowcnt, totalRowsInserted))
# tdLog.exit("%d tmq consume rows error!"%consumerId)
#
# firstConsumeRows = resultList[0]
#
# # reinit consume info, and start tmq_sim, then check consume result
# tmqCom.initConsumerTable()
# consumerId = 2
# expectrowcnt = math.ceil(paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2/3)
# tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
#
# tdLog.info("start consume processor 1")
# tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
# tdLog.info("wait the consume result")
#
# expectRows = 1
# resultList = tmqCom.selectConsumeResult(expectRows)
#
# actConsumeTotalRows = firstConsumeRows + resultList[0]
#
# if not (expectrowcnt >= resultList[0] and totalRowsInserted == actConsumeTotalRows):
# tdLog.info("act consume rows, first: %d, second: %d "%(firstConsumeRows, resultList[0]))
# tdLog.info("and sum of two consume rows: %d should be equal to total inserted rows: %d"%(actConsumeTotalRows, totalRowsInserted))
# tdLog.exit("%d tmq consume rows error!"%consumerId)
#
# time.sleep(10)
# for i in range(len(topicNameList)):
# tdSql.query("drop topic %s"%topicNameList[i])
#
# tdLog.printNoPrefix("======== test case 2 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -0,0 +1,211 @@
import taos
import sys
import time
import socket
import os
import threading
import math
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from util.cluster import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
def __init__(self):
self.vgroups = 1
self.ctbNum = 10
self.rowsPerTbl = 10000
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor(), False)
def getDataPath(self):
selfPath = os.path.dirname(os.path.realpath(__file__))
return selfPath + '/../../../sim/dnode%d/data/vnode/vnode%d/wal/*';
def prepareTestEnv(self):
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctb',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 200,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tmqCom.initConsumerTable()
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], wal_retention_period=36000,vgroups=paraDict["vgroups"],replica=self.replicaVar)
tdLog.info("create stb")
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("restart taosd to ensure that the data falls into the disk")
# tdDnodes.stop(1)
# tdDnodes.start(1)
tdSql.query("flush database %s"%(paraDict['dbName']))
return
def tmqCase1(self):
tdLog.printNoPrefix("======== test case 1: ")
paraDict = {'dbName': 'dbt',
'dropFlag': 1,
'event': '',
'vgroups': 1,
'stbName': 'stb',
'colPrefix': 'c',
'tagPrefix': 't',
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
'ctbPrefix': 'ctbn',
'ctbStartIdx': 0,
'ctbNum': 10,
'rowsPerTbl': 10000,
'batchNum': 10,
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
'pollDelay': 200,
'showMsg': 1,
'showRow': 1,
'snapshot': 0}
paraDict['vgroups'] = self.vgroups
paraDict['ctbNum'] = self.ctbNum
paraDict['rowsPerTbl'] = self.rowsPerTbl
tdLog.info("create ctb")
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
tdLog.info("insert data")
pInsertThread = tmqCom.asyncInsertDataByInterlace(paraDict)
topicNameList = ['topic1']
# expectRowsList = []
tmqCom.initConsumerTable("cdb", self.replicaVar)
tdLog.info("create topics from stb with filter")
queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName'])
# sqlString = "create topic %s as stable %s" %(topicNameList[0], paraDict['stbName'])
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
tdLog.info("create topic sql: %s"%sqlString)
tdSql.execute(sqlString)
# tdSql.query(queryString)
# expectRowsList.append(tdSql.getRows())
# init consume info, and start tmq_sim, then check consume result
tdLog.info("insert consume info to consume processor")
consumerId = 0
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
topicList = topicNameList[0]
ifcheckdata = 1
ifManualCommit = 1
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest'
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
tdLog.info("start consume processor")
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
tdLog.info("wait the consume result")
#restart dnode & remove wal
tdDnodes = cluster.dnodes
tdSql.query("select * from information_schema.ins_vnodes")
for result in tdSql.queryResult:
if result[2] == 'dbt':
tdLog.debug("dnode is %d"%(result[0]))
dnodeId = result[0]
vnodeId = result[1]
tdDnodes[dnodeId - 1].stoptaosd()
time.sleep(2)
dataPath = self.getDataPath()
dataPath = dataPath%(dnodeId,vnodeId)
os.system('rm -rf ' + dataPath)
tdLog.debug("dataPath:%s"%dataPath)
tdDnodes[dnodeId - 1].starttaosd()
time.sleep(2)
break
tdLog.debug("restart dnode ok")
# redistribute vgroup
dnodesList = []
tdSql.query("show dnodes")
for result in tdSql.queryResult:
dnodesList.append(result[0])
tdSql.query("select * from information_schema.ins_vnodes")
vnodeId = 0
for result in tdSql.queryResult:
if result[2] == 'dbt':
tdLog.debug("dnode is %d"%(result[0]))
dnodesList.remove(result[0])
vnodeId = result[1]
break
redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0])
tdLog.debug("redistributeSql:%s"%(redistributeSql))
time.sleep(10)
tdSql.query(redistributeSql)
tdLog.debug("redistributeSql ok")
tmqCom.getStartConsumeNotifyFromTmqsim()
tmqCom.getStartCommitNotifyFromTmqsim()
pInsertThread.join()
expectRows = 1
resultList = tmqCom.selectConsumeResult(expectRows)
if expectrowcnt != resultList[0]:
tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, resultList[0]))
tdLog.exit("%d tmq consume rows error!"%consumerId)
# tmqCom.checkFileContent(consumerId, queryString)
time.sleep(10)
for i in range(len(topicNameList)):
tdSql.query("drop topic %s"%topicNameList[i])
tdLog.printNoPrefix("======== test case 1 end ...... ")
def run(self):
tdSql.prepare()
self.prepareTestEnv()
self.tmqCase1()
# self.tmqCase2()
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
event = threading.Event()
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())