From 766652a88aaba34b713439541c6a35f21781c7a3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 19 Sep 2023 17:27:19 +0800 Subject: [PATCH] fix:vnode tranform support in tmq --- .../dnode/vnode/src/tq/tqCheckInfoSnapshot.c | 29 +-- source/dnode/vnode/src/tq/tqHandleSnapshot.c | 41 ---- tests/parallel_test/cases.task | 2 +- tests/system-test/7-tmq/tmqVnodeTransform.py | 197 ++++++++++++++---- 4 files changed, 164 insertions(+), 105 deletions(-) diff --git a/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c b/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c index 346dcf5b50..a3bd22eef0 100644 --- a/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c +++ b/source/dnode/vnode/src/tq/tqCheckInfoSnapshot.c @@ -79,25 +79,11 @@ int32_t tqCheckInfoRead(STqCheckInfoReader* pReader, uint8_t** ppData) { void* pVal = NULL; int32_t kLen = 0; int32_t vLen = 0; -// SDecoder decoder; -// STqCheckInfo info; -// *ppData = NULL; if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { goto _exit; } -// tDecoderInit(&decoder, (uint8_t*)pVal, vLen); -// if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { -// tdbFree(pKey); -// tdbFree(pVal); -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _err; -// } -// tdbFree(pKey); -// tdbFree(pVal); -// tDecoderClear(&decoder); -// *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -175,20 +161,13 @@ int32_t tqCheckInfoWriterClose(STqCheckInfoWriter** ppWriter, int8_t rollback) { if (code) goto _err; } - int vgId = TD_VID(pWriter->pTq->pVnode); - taosMemoryFree(pWriter); *ppWriter = NULL; - // restore from metastore - if (tqMetaRestoreCheckInfo(pTq) < 0) { - goto _err; - } - return code; _err: - tqError("vgId:%d, tq check info writer close failed since %s", vgId, tstrerror(code)); + tqError("vgId:%d, tq check info writer close failed since %s", TD_VID(pTq->pVnode), tstrerror(code)); return code; } @@ -199,11 +178,13 @@ int32_t tqCheckInfoWrite(STqCheckInfoWriter* pWriter, uint8_t* pData, uint32_t n SDecoder decoder; SDecoder* pDecoder = &decoder; - + tDecoderInit(pDecoder, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); code = tDecodeSTqCheckInfo(pDecoder, &info); if (code) goto _err; + code = taosHashPut(pTq->pCheckInfo, info.topic, strlen(info.topic), &info, sizeof(STqCheckInfo)); + if (code) goto _err; code = tqMetaSaveCheckInfo(pTq, info.topic, pData + sizeof(SSnapDataHdr), nData - sizeof(SSnapDataHdr)); - if (code < 0) goto _err; + if (code) goto _err; tDecoderClear(pDecoder); return code; diff --git a/source/dnode/vnode/src/tq/tqHandleSnapshot.c b/source/dnode/vnode/src/tq/tqHandleSnapshot.c index 93b8a0398a..7d3e2f7837 100644 --- a/source/dnode/vnode/src/tq/tqHandleSnapshot.c +++ b/source/dnode/vnode/src/tq/tqHandleSnapshot.c @@ -79,47 +79,11 @@ int32_t tqSnapRead(STqSnapReader* pReader, uint8_t** ppData) { void* pVal = NULL; int32_t kLen = 0; int32_t vLen = 0; -// SDecoder decoder; -// STqHandle handle; - -// *ppData = NULL; if (tdbTbcNext(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { goto _exit; } -// tDecoderInit(&decoder, (uint8_t*)pVal, vLen); -// if (tDecodeSTqCheckInfo(&decoder, &info) < 0) { -// tdbFree(pKey); -// tdbFree(pVal); -// code = TSDB_CODE_OUT_OF_MEMORY; -// goto _err; -// } -// tdbFree(pKey); -// tdbFree(pVal); -// tDecoderClear(&decoder); - -// *ppData = NULL; -// for (;;) { -// if (tdbTbcGet(pReader->pCur, &pKey, &kLen, &pVal, &vLen)) { -// goto _exit; -// } -// -// tDecoderInit(&decoder, (uint8_t*)pVal, vLen); -// tDecodeSTqHandle(&decoder, &handle); -// tDecoderClear(&decoder); -// -// tqInfo("vgId:%d, vnode snapshot tq start read data, version:%" PRId64 " subKey: %s vLen:%d, sver:%"PRId64 " , ever:%" PRId64, TD_VID(pReader->pTq->pVnode), -// handle.snapshotVer, handle.subKey, vLen, -// pReader->sver, pReader->ever); -// if (handle.snapshotVer <= pReader->sver && handle.snapshotVer >= pReader->ever) { -// tdbTbcMoveToNext(pReader->pCur); -// break; -// } else { -// tdbTbcMoveToNext(pReader->pCur); -// } -// } - *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + vLen); if (*ppData == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -198,11 +162,6 @@ int32_t tqSnapWriterClose(STqSnapWriter** ppWriter, int8_t rollback) { taosMemoryFree(pWriter); *ppWriter = NULL; -// // restore from metastore -// if (tqMetaRestoreHandle(pTq) < 0) { -// goto _err; -// } - return code; _err: diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index baaf0d406f..9c6cd5c99a 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -1269,4 +1269,4 @@ ,,n,docs-examples-test,bash csharp.sh ,,n,docs-examples-test,bash jdbc.sh ,,n,docs-examples-test,bash go.sh -,,n,docs-examples-test,bash test_R.sh \ No newline at end of file +,,n,docs-examples-test,bash test_R.sh diff --git a/tests/system-test/7-tmq/tmqVnodeTransform.py b/tests/system-test/7-tmq/tmqVnodeTransform.py index fea459350c..8db9ce0e13 100644 --- a/tests/system-test/7-tmq/tmqVnodeTransform.py +++ b/tests/system-test/7-tmq/tmqVnodeTransform.py @@ -76,6 +76,45 @@ class TDTestCase: # tdSql.query("flush database %s"%(paraDict['dbName'])) return + def restartAndRemoveWal(self): + tdDnodes = cluster.dnodes + tdSql.query("select * from information_schema.ins_vnodes") + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodeId = result[0] + vnodeId = result[1] + + tdDnodes[dnodeId - 1].stoptaosd() + time.sleep(1) + dataPath = self.getDataPath() + dataPath = dataPath%(dnodeId,vnodeId) + os.system('rm -rf ' + dataPath) + tdLog.debug("dataPath:%s"%dataPath) + tdDnodes[dnodeId - 1].starttaosd() + time.sleep(1) + break + tdLog.debug("restart dnode ok") + + def redistributeVgroups(self): + dnodesList = [] + tdSql.query("show dnodes") + for result in tdSql.queryResult: + dnodesList.append(result[0]) + + tdSql.query("select * from information_schema.ins_vnodes") + vnodeId = 0 + for result in tdSql.queryResult: + if result[2] == 'dbt': + tdLog.debug("dnode is %d"%(result[0])) + dnodesList.remove(result[0]) + vnodeId = result[1] + break + redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0]) + tdLog.debug("redistributeSql:%s"%(redistributeSql)) + tdSql.query(redistributeSql) + tdLog.debug("redistributeSql ok") + def tmqCase1(self): tdLog.printNoPrefix("======== test case 1: ") paraDict = {'dbName': 'dbt', @@ -104,7 +143,7 @@ class TDTestCase: topicNameList = ['topic1'] # expectRowsList = [] - tmqCom.initConsumerTable("cdb", self.replicaVar) + tmqCom.initConsumerTable() tdLog.info("create topics from stb with filter") queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) @@ -118,7 +157,7 @@ class TDTestCase: # init consume info, and start tmq_sim, then check consume result tdLog.info("insert consume info to consume processor") consumerId = 0 - expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2 topicList = topicNameList[0] ifcheckdata = 1 ifManualCommit = 1 @@ -139,43 +178,10 @@ class TDTestCase: tmqCom.getStartCommitNotifyFromTmqsim() #restart dnode & remove wal - tdDnodes = cluster.dnodes - tdSql.query("select * from information_schema.ins_vnodes") - for result in tdSql.queryResult: - if result[2] == 'dbt': - tdLog.debug("dnode is %d"%(result[0])) - dnodeId = result[0] - vnodeId = result[1] - - tdDnodes[dnodeId - 1].stoptaosd() - time.sleep(1) - dataPath = self.getDataPath() - dataPath = dataPath%(dnodeId,vnodeId) - os.system('rm -rf ' + dataPath) - tdLog.debug("dataPath:%s"%dataPath) - tdDnodes[dnodeId - 1].starttaosd() - time.sleep(1) - break - tdLog.debug("restart dnode ok") + self.restartAndRemoveWal() # redistribute vgroup - dnodesList = [] - tdSql.query("show dnodes") - for result in tdSql.queryResult: - dnodesList.append(result[0]) - - tdSql.query("select * from information_schema.ins_vnodes") - vnodeId = 0 - for result in tdSql.queryResult: - if result[2] == 'dbt': - tdLog.debug("dnode is %d"%(result[0])) - dnodesList.remove(result[0]) - vnodeId = result[1] - break - redistributeSql = "redistribute vgroup %d dnode %d" %(vnodeId, dnodesList[0]) - tdLog.debug("redistributeSql:%s"%(redistributeSql)) - tdSql.query(redistributeSql) - tdLog.debug("redistributeSql ok") + self.redistributeVgroups(); tdLog.info("create ctb2") paraDict['ctbPrefix'] = "ctbn" @@ -189,8 +195,8 @@ class TDTestCase: expectRows = 1 resultList = tmqCom.selectConsumeResult(expectRows) - if expectrowcnt >= resultList[0]: - tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt, resultList[0])) + if expectrowcnt / 2 >= resultList[0]: + tdLog.info("expect consume rows: %d, act consume rows: %d"%(expectrowcnt / 2, resultList[0])) tdLog.exit("%d tmq consume rows error!"%consumerId) # tmqCom.checkFileContent(consumerId, queryString) @@ -201,12 +207,125 @@ class TDTestCase: tdLog.printNoPrefix("======== test case 1 end ...... ") + def tmqCase2(self): + tdLog.printNoPrefix("======== test case 2: ") + paraDict = {'dbName':'dbt'} + + ntbName = "ntb" + + topicNameList = ['topic2'] + tmqCom.initConsumerTable() + + sqlString = "create table %s.%s(ts timestamp, i nchar(8))" %(paraDict['dbName'], ntbName) + tdLog.info("create nomal table sql: %s"%sqlString) + tdSql.execute(sqlString) + + tdLog.info("create topics from nomal table") + queryString = "select * from %s.%s"%(paraDict['dbName'], ntbName) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + tdSql.query("flush database %s"%(paraDict['dbName'])) + #restart dnode & remove wal + self.restartAndRemoveWal() + + # redistribute vgroup + self.redistributeVgroups(); + + sqlString = "alter table %s.%s modify column i nchar(16)" %(paraDict['dbName'], ntbName) + tdLog.info("alter table sql: %s"%sqlString) + tdSql.error(sqlString) + + time.sleep(1) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 2 end ...... ") + + def tmqCase3(self): + tdLog.printNoPrefix("======== test case 3: ") + paraDict = {'dbName': 'dbt', + 'dropFlag': 1, + 'event': '', + 'vgroups': 1, + 'stbName': 'stbn', + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': 10, + 'rowsPerTbl': 10000, + 'batchNum': 10, + 'startTs': 1640966400000, # 2022-01-01 00:00:00.000 + 'pollDelay': 2, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0} + + paraDict['vgroups'] = self.vgroups + paraDict['ctbNum'] = self.ctbNum + paraDict['rowsPerTbl'] = self.rowsPerTbl + + topicNameList = ['topic3'] + tmqCom.initConsumerTable() + + tdLog.info("create stb") + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"]) + + tdLog.info("create ctb") + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("insert data") + tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"], + ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"], + startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx']) + + tdLog.info("create topics from stb with filter") + queryString = "select * from %s.%s"%(paraDict['dbName'], paraDict['stbName']) + sqlString = "create topic %s as %s" %(topicNameList[0], queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # init consume info, and start tmq_sim, then check consume result + tdLog.info("insert consume info to consume processor") + consumerId = 0 + expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] + topicList = topicNameList[0] + ifcheckdata = 1 + ifManualCommit = 1 + keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:200, auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + time.sleep(5) + #restart dnode & remove wal + self.restartAndRemoveWal() + + # redistribute vgroup + self.redistributeVgroups(); + + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot']) + tdLog.info("wait the consume result") + + time.sleep(10) + for i in range(len(topicNameList)): + tdSql.query("drop topic %s"%topicNameList[i]) + + tdLog.printNoPrefix("======== test case 3 end ...... ") + def run(self): tdSql.prepare() self.prepareTestEnv() self.tmqCase1() # self.tmqCase2() + # self.tmqCase3() def stop(self): tdSql.close()