From 5f7b6f19baeac0a5d4d30895f80e11ece6246328 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Sun, 8 Oct 2023 19:05:59 +0800 Subject: [PATCH] feat:[TD-26056] add replay logic --- source/client/src/clientTmq.c | 1 + source/dnode/vnode/inc/vnode.h | 3 +- source/libs/executor/src/projectoperator.c | 3 +- tests/system-test/7-tmq/replay.py | 320 +++++++++++++++++++++ 4 files changed, 325 insertions(+), 2 deletions(-) create mode 100644 tests/system-test/7-tmq/replay.py diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 50b8eb1eca..252959ecd8 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1532,6 +1532,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl pReq->head.vgId = pVg->vgId; pReq->useSnapshot = tmq->useSnapshot; pReq->reqId = generateRequestId(); + pReq->enableReplay = tmq->replayEnable; } SMqMetaRspObj* tmqBuildMetaRspFromWrapper(SMqPollRspWrapper* pWrapper) { diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 122f6733fb..787b717015 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -216,7 +216,7 @@ typedef struct STqReader { SPackedData msg; SSubmitReq2 submit; int32_t nextBlk; - int64_t lastTs; + int64_t lastBlkUid; SWalReader *pWalReader; SMeta *pVnodeMeta; SHashObj *tbIdHash; @@ -226,6 +226,7 @@ typedef struct STqReader { int64_t cachedSchemaUid; SSchemaWrapper *pSchemaWrapper; SSDataBlock *pResBlock; + int64_t lastTs; } STqReader; STqReader *tqReaderOpen(SVnode *pVnode); diff --git a/source/libs/executor/src/projectoperator.c b/source/libs/executor/src/projectoperator.c index 00b246afad..227960ab22 100644 --- a/source/libs/executor/src/projectoperator.c +++ b/source/libs/executor/src/projectoperator.c @@ -110,7 +110,7 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys pInfo->binfo.inputTsOrder = pProjPhyNode->node.inputTsOrder; pInfo->binfo.outputTsOrder = pProjPhyNode->node.outputTsOrder; - if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM) { + if (pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM || pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE) { pInfo->mergeDataBlocks = false; } else { if (!pProjPhyNode->ignoreGroupId) { @@ -330,6 +330,7 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) { break; } + qDebug("project return %d", pProjectInfo->mergeDataBlocks); if (pProjectInfo->mergeDataBlocks) { if (pRes->info.rows > 0) { pFinalRes->info.id.groupId = 0; // clear groupId diff --git a/tests/system-test/7-tmq/replay.py b/tests/system-test/7-tmq/replay.py new file mode 100644 index 0000000000..7eee6743a7 --- /dev/null +++ b/tests/system-test/7-tmq/replay.py @@ -0,0 +1,320 @@ + +import taos +import sys +import time +import socket +import os +import threading +from enum import Enum + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * + +class actionType(Enum): + CREATE_DATABASE = 0 + CREATE_STABLE = 1 + CREATE_CTABLE = 2 + INSERT_DATA = 3 + +class TDTestCase: + hostname = socket.gethostname() + #rpcDebugFlagVal = '143' + #clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''} + #updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal + #print ("===================: ", updatecfgDict) + + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def getBuildPath(self): + selfPath = os.path.dirname(os.path.realpath(__file__)) + + if ("community" in selfPath): + projPath = selfPath[:selfPath.find("community")] + else: + projPath = selfPath[:selfPath.find("tests")] + + for root, dirs, files in os.walk(projPath): + if ("taosd" in files or "taosd.exe" in files): + rootRealPath = os.path.dirname(os.path.realpath(root)) + if ("packaging" not in rootRealPath): + buildPath = root[:len(root) - len("/build/bin")] + break + return buildPath + + def newcur(self,cfg,host,port): + user = "root" + password = "taosdata" + con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port) + cur=con.cursor() + print(cur) + return cur + + def initConsumerTable(self,cdbName='cdb'): + tdLog.info("create consume database, and consume info table, and consume result table") + tdSql.query("create database if not exists %s vgroups 1 wal_retention_period 3600"%(cdbName)) + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("drop table if exists %s.consumeresult "%(cdbName)) + + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName) + + def initConsumerInfoTable(self,cdbName='cdb'): + tdLog.info("drop consumeinfo table") + tdSql.query("drop table if exists %s.consumeinfo "%(cdbName)) + tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName) + + def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'): + sql = "insert into %s.consumeinfo values "%cdbName + sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit) + tdLog.info("consume info sql: %s"%sql) + tdSql.query(sql) + + def selectConsumeResult(self,expectRows,cdbName='cdb'): + resultList=[] + while 1: + tdSql.query("select * from %s.consumeresult"%cdbName) + #tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3)) + if tdSql.getRows() == expectRows: + break + else: + time.sleep(5) + + for i in range(expectRows): + tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3))) + resultList.append(tdSql.getData(i , 3)) + + return resultList + + def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0): + if valgrind == 1: + logFile = cfgPath + '/../log/valgrind-tmq.log' + shellCmd = 'nohup valgrind --log-file=' + logFile + shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes ' + + if (platform.system().lower() == 'windows'): + shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> nul 2>&1 &" + else: + shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath + shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName) + shellCmd += "> /dev/null 2>&1 &" + tdLog.info(shellCmd) + os.system(shellCmd) + + def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1): + if dropFlag == 1: + tsql.execute("drop database if exists %s"%(dbName)) + + tsql.execute("create database if not exists %s vgroups %d replica %d wal_retention_period 3600"%(dbName, vgroups, replica)) + tdLog.debug("complete to create database %s"%(dbName)) + return + + def create_stable(self,tsql, dbName,stbName): + tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName)) + tdLog.debug("complete to create %s.%s" %(dbName, stbName)) + return + + def create_ctables(self,tsql, dbName,stbName,ctbNum): + tsql.execute("use %s" %dbName) + pre_create = "create table" + sql = pre_create + #tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname)) + for i in range(ctbNum): + sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1) + if (i > 0) and (i%100 == 0): + tsql.execute(sql) + sql = pre_create + if sql != pre_create: + tsql.execute(sql) + + tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName)) + return + + def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0): + tdLog.debug("start to insert data ............") + tsql.execute("use %s" %dbName) + pre_insert = "insert into " + sql = pre_insert + + if startTs == 0: + t = time.time() + startTs = int(round(t * 1000)) + + #tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows)) + rowsOfSql = 0 + for i in range(ctbNum): + sql += " %s_%d values "%(stbName,i) + for j in range(rowsPerTbl): + sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j) + rowsOfSql += 1 + if ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)): + tsql.execute(sql) + time.sleep(1) + rowsOfSql = 0 + if j < rowsPerTbl - 1: + sql = "insert into %s_%d values " %(stbName,i) + else: + sql = "insert into " + #end sql + if sql != pre_insert: + #print("insert sql:%s"%sql) + tsql.execute(sql) + tdLog.debug("insert data ............ [OK]") + return + + def prepareEnv(self, **parameterDict): + # create new connector for my thread + tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030) + + if parameterDict["actionType"] == actionType.CREATE_DATABASE: + self.create_database(tsql, parameterDict["dbName"]) + elif parameterDict["actionType"] == actionType.CREATE_STABLE: + self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"]) + elif parameterDict["actionType"] == actionType.CREATE_CTABLE: + self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + elif parameterDict["actionType"] == actionType.INSERT_DATA: + self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"], \ + parameterDict["rowsPerTbl"],parameterDict["batchNum"]) + else: + tdLog.exit("not support's action: ", parameterDict["actionType"]) + + return + + def tmqCase8(self, cfgPath, buildPath): + tdLog.printNoPrefix("======== test case 8: ") + + self.initConsumerTable() + + # create and start thread + parameterDict = {'cfg': '', \ + 'actionType': 0, \ + 'dbName': 'db8', \ + 'dropFlag': 1, \ + 'vgroups': 4, \ + 'replica': 1, \ + 'stbName': 'stb1', \ + 'ctbNum': 1, \ + 'rowsPerTbl': 10, \ + 'batchNum': 1, \ + 'startTs': 1640966400000} # 2022-01-01 00:00:00.000 + parameterDict['cfg'] = cfgPath + + self.create_database(tdSql, parameterDict["dbName"]) + self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"]) + self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"]) + self.insert_data(tdSql,\ + parameterDict["dbName"],\ + parameterDict["stbName"],\ + parameterDict["ctbNum"],\ + parameterDict["rowsPerTbl"],\ + parameterDict["batchNum"]) + + tdLog.info("create topics from stb1") + topicFromStb1 = 'topic_stb1' + + tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName'])) + consumerId = 0 + expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"] + topicList = topicFromStb1 + ifcheckdata = 0 + ifManualCommit = 1 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest,\ + enable.replay:true' + self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit) + + tdLog.info("start consume 0 processor") + pollDelay = 100 + showMsg = 1 + showRow = 1 + self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + + tdLog.info("start to check consume 0 result") + expectRows = 1 + resultList = self.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + if totalConsumeRows != 0: + tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, 0)) + tdLog.exit("tmq consume rows error!") + + # tdLog.info("start consume 1 processor") + # self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + # tdLog.sleep(2) + # + # tdLog.info("start one new thread to insert data") + # parameterDict['actionType'] = actionType.INSERT_DATA + # prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + # prepareEnvThread.start() + # prepareEnvThread.join() + # + # tdLog.info("start to check consume 0 and 1 result") + # expectRows = 2 + # resultList = self.selectConsumeResult(expectRows) + # totalConsumeRows = 0 + # for i in range(expectRows): + # totalConsumeRows += resultList[i] + # + # if totalConsumeRows != expectrowcnt: + # tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt)) + # tdLog.exit("tmq consume rows error!") + # + # tdLog.info("start consume 2 processor") + # self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow) + # tdLog.sleep(2) + # + # tdLog.info("start one new thread to insert data") + # parameterDict['actionType'] = actionType.INSERT_DATA + # prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict) + # prepareEnvThread.start() + # prepareEnvThread.join() + # + # tdLog.info("start to check consume 0 and 1 and 2 result") + # expectRows = 3 + # resultList = self.selectConsumeResult(expectRows) + # totalConsumeRows = 0 + # for i in range(expectRows): + # totalConsumeRows += resultList[i] + # + # if totalConsumeRows != expectrowcnt*2: + # tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2)) + # tdLog.exit("tmq consume rows error!") + # + # tdSql.query("drop topic %s"%topicFromStb1) + + tdLog.printNoPrefix("======== test case 8 end ...... ") + + def run(self): + tdSql.prepare() + + buildPath = self.getBuildPath() + if (buildPath == ""): + tdLog.exit("taosd not found!") + else: + tdLog.info("taosd found in %s" % buildPath) + cfgPath = buildPath + "/../sim/psim/cfg" + tdLog.info("cfgPath: %s" % cfgPath) + + self.tmqCase8(cfgPath, buildPath) + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +event = threading.Event() + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase())