test: add tmq test case
This commit is contained in:
parent
2f64f0dce7
commit
65e7c9ee59
|
@ -0,0 +1,316 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
sys.path.append("./7-tmq")
|
||||
from tmqCommon import *
|
||||
|
||||
class TDTestCase:
|
||||
def __init__(self):
|
||||
self.vgroups = 1
|
||||
self.ctbNum = 100
|
||||
self.rowsPerTbl = 10000
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor(), False)
|
||||
|
||||
def prepareTestEnv(self):
|
||||
tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 100000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("restart taosd to ensure that the data falls into the disk")
|
||||
# tdDnodes.stop(1)
|
||||
# tdDnodes.start(1)
|
||||
tdSql.query("flush database %s"%(paraDict['dbName']))
|
||||
return
|
||||
|
||||
def tmqCase1(self):
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 5,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
# time.sleep(3)
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
tdLog.info("================= restart dnode ===========================")
|
||||
tdDnodes.stop(1)
|
||||
tdDnodes.start(1)
|
||||
time.sleep(5)
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 1}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("create ctb")
|
||||
tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"],ctbPrefix=paraDict['ctbPrefix'],
|
||||
ctbNum=paraDict["ctbNum"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("insert data")
|
||||
tmqCom.insert_data_interlaceByMultiTbl(tsql=tdSql,dbName=paraDict["dbName"],ctbPrefix=paraDict["ctbPrefix"],
|
||||
ctbNum=paraDict["ctbNum"],rowsPerTbl=paraDict["rowsPerTbl"],batchNum=paraDict["batchNum"],
|
||||
startTs=paraDict["startTs"],ctbStartIdx=paraDict['ctbStartIdx'])
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
tdLog.info("create some new child table and insert data ")
|
||||
paraDict['batchNum'] = 100
|
||||
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
|
||||
|
||||
tmqCom.getStartCommitNotifyFromTmqsim()
|
||||
tdLog.info("================= restart dnode ===========================")
|
||||
tdDnodes.stop(1)
|
||||
tdDnodes.start(1)
|
||||
time.sleep(5)
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
# 自动建表完成数据插入,启动消费
|
||||
def tmqCase3(self):
|
||||
tdLog.printNoPrefix("======== test case 3: ")
|
||||
paraDict = {'dbName': 'dbt',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 1,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbStartIdx': 0,
|
||||
'ctbNum': 1,
|
||||
'rowsPerTbl': 10000,
|
||||
'batchNum': 100,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 3,
|
||||
'showMsg': 1,
|
||||
'showRow': 1,
|
||||
'snapshot': 0}
|
||||
|
||||
paraDict['vgroups'] = self.vgroups
|
||||
paraDict['ctbNum'] = self.ctbNum
|
||||
paraDict['rowsPerTbl'] = self.rowsPerTbl
|
||||
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=paraDict["vgroups"],replica=1)
|
||||
tdLog.info("create stb")
|
||||
tmqCom.create_stable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"])
|
||||
tdLog.info("insert data by auto create ctb")
|
||||
tmqCom.insert_data_with_autoCreateTbl(tdSql,paraDict["dbName"],paraDict["stbName"],"ctb",paraDict["ctbNum"],paraDict["rowsPerTbl"],paraDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, paraDict['dbName'], paraDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"]
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:true,\
|
||||
auto.commit.interval.ms:1000,\
|
||||
auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'],dbName=paraDict["dbName"],showMsg=paraDict['showMsg'], showRow=paraDict['showRow'],snapshot=paraDict['snapshot'])
|
||||
|
||||
# tdLog.info("================= restart dnode ===========================")
|
||||
# tdDnodes.stop(1)
|
||||
# tdDnodes.start(1)
|
||||
# time.sleep(2)
|
||||
|
||||
tdLog.info("insert process end, and start to check consume result")
|
||||
expectRows = 1
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
if totalConsumeRows != expectrowcnt:
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
|
||||
tdLog.printNoPrefix("======== test case 3 end ...... ")
|
||||
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
# self.tmqCase1()
|
||||
# self.tmqCase2()
|
||||
self.tmqCase3()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -365,15 +365,21 @@ class TMQCom:
|
|||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsBatched = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s.%s_%d using %s.%s tags (%d) values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i)
|
||||
tagBinaryValue = 'beijing'
|
||||
if (i % 2 == 0):
|
||||
tagBinaryValue = 'shanghai'
|
||||
elif (i % 3 == 0):
|
||||
tagBinaryValue = 'changsha'
|
||||
|
||||
sql += " %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values "%(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
|
||||
for j in range(rowsPerTbl):
|
||||
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||
sql += "(%d, %d, %d, %d, 'binary_%d', 'nchar_%d', now) "%(startTs+j, j,j, j,i+ctbStartIdx,rowsBatched)
|
||||
rowsBatched += 1
|
||||
if ((rowsBatched == batchNum) or (j == rowsPerTbl - 1)):
|
||||
tsql.execute(sql)
|
||||
rowsBatched = 0
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s.%s_%d using %s.%s tags (%d) values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i)
|
||||
sql = "insert into %s.%s_%d using %s.%s tags (%d, %d, %d, '%s', '%s') values " %(dbName,ctbPrefix,i+ctbStartIdx,dbName,stbName,i+ctbStartIdx,i+ctbStartIdx,i+ctbStartIdx,tagBinaryValue,tagBinaryValue)
|
||||
else:
|
||||
sql = "insert into "
|
||||
#end sql
|
||||
|
|
|
@ -52,6 +52,7 @@ typedef struct {
|
|||
// char autoOffsetRest[16]; // none, earliest, latest
|
||||
|
||||
TdFilePtr pConsumeRowsFile;
|
||||
TdFilePtr pConsumeMetaFile;
|
||||
int32_t ifCheckData;
|
||||
int64_t expectMsgCnt;
|
||||
|
||||
|
@ -445,7 +446,7 @@ static void dumpToFileForCheck(TdFilePtr pFile, TAOS_ROW row, TAOS_FIELD* fields
|
|||
taosFprintfFile(pFile, "\n");
|
||||
}
|
||||
|
||||
static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
||||
static int32_t data_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
||||
char buf[1024];
|
||||
int32_t totalRows = 0;
|
||||
|
||||
|
@ -496,6 +497,52 @@ static int32_t msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex)
|
|||
return totalRows;
|
||||
}
|
||||
|
||||
|
||||
static int32_t meta_msg_process(TAOS_RES* msg, SThreadInfo* pInfo, int32_t msgIndex) {
|
||||
char buf[1024];
|
||||
int32_t totalRows = 0;
|
||||
|
||||
// printf("topic: %s\n", tmq_get_topic_name(msg));
|
||||
int32_t vgroupId = tmq_get_vgroup_id(msg);
|
||||
const char* dbName = tmq_get_db_name(msg);
|
||||
|
||||
taosFprintfFile(g_fp, "consumerId: %d, msg index:%" PRId64 "\n", pInfo->consumerId, msgIndex);
|
||||
taosFprintfFile(g_fp, "dbName: %s, topic: %s, vgroupId: %d\n", dbName != NULL ? dbName : "invalid table",
|
||||
tmq_get_topic_name(msg), vgroupId);
|
||||
|
||||
{
|
||||
tmq_raw_data *raw = tmq_get_raw_meta(msg);
|
||||
|
||||
if(raw){
|
||||
TAOS_RES* pRes = taos_query(pInfo->taos, "use metadb");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
pError("error when use metadb, reason:%s\n", taos_errstr(pRes));
|
||||
taosFprintfFile(g_fp, "error when use metadb, reason:%s\n", taos_errstr(pRes));
|
||||
taosCloseFile(&g_fp);
|
||||
taos_free_result(pRes);
|
||||
exit(-1);
|
||||
}
|
||||
taos_free_result(pRes);
|
||||
taosFprintfFile(g_fp, "raw:%p\n", raw);
|
||||
|
||||
int32_t ret = taos_write_raw_meta(pInfo->taos, raw);
|
||||
taosMemoryFree(raw);
|
||||
}
|
||||
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
if(result){
|
||||
//printf("meta result: %s\n", result);
|
||||
taosFprintfFile(pInfo->pConsumeMetaFile, "%s\n", result);
|
||||
taosMemoryFree(result);
|
||||
}
|
||||
}
|
||||
|
||||
totalRows++;
|
||||
|
||||
return totalRows;
|
||||
}
|
||||
|
||||
|
||||
int queryDB(TAOS* taos, char* command) {
|
||||
TAOS_RES* pRes = taos_query(taos, command);
|
||||
int code = taos_errno(pRes);
|
||||
|
@ -526,7 +573,7 @@ int32_t notifyMainScript(SThreadInfo* pInfo, int32_t cmdId) {
|
|||
|
||||
static int32_t g_once_commit_flag = 0;
|
||||
static void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
|
||||
pError("tmq_commit_cb_print() commit %d\n", code);
|
||||
taosFprintfFile(g_fp, "tmq_commit_cb_print() commit %d\n", code);
|
||||
|
||||
if (0 == g_once_commit_flag) {
|
||||
g_once_commit_flag = 1;
|
||||
|
@ -630,8 +677,12 @@ void loop_consume(SThreadInfo* pInfo) {
|
|||
// getCurrentTimeString(tmpString));
|
||||
sprintf(filename, "%s/../log/consumerid_%d.txt", configDir, pInfo->consumerId);
|
||||
pInfo->pConsumeRowsFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
if (pInfo->pConsumeRowsFile == NULL) {
|
||||
taosFprintfFile(g_fp, "%s create file fail for save rows content\n", getCurrentTimeString(tmpString));
|
||||
|
||||
sprintf(filename, "%s/../log/meta_consumerid_%d.txt", configDir, pInfo->consumerId);
|
||||
pInfo->pConsumeMetaFile = taosOpenFile(filename, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_TRUNC | TD_FILE_STREAM);
|
||||
|
||||
if (pInfo->pConsumeRowsFile == NULL || pInfo->pConsumeMetaFile == NULL) {
|
||||
taosFprintfFile(g_fp, "%s create file fail for save rows or save meta\n", getCurrentTimeString(tmpString));
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -645,7 +696,11 @@ void loop_consume(SThreadInfo* pInfo) {
|
|||
TAOS_RES* tmqMsg = tmq_consumer_poll(pInfo->tmq, consumeDelay);
|
||||
if (tmqMsg) {
|
||||
if (0 != g_stConfInfo.showMsgFlag) {
|
||||
totalRows += msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
tmq_res_t msgType = tmq_get_res_type(tmqMsg);
|
||||
if (msgType == TMQ_RES_TABLE_META) {
|
||||
totalRows += meta_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
} else if (msgType == TMQ_RES_DATA)
|
||||
totalRows += data_msg_process(tmqMsg, pInfo, totalMsgs);
|
||||
}
|
||||
|
||||
taos_free_result(tmqMsg);
|
||||
|
|
Loading…
Reference in New Issue