Merge branch '3.0' of https://github.com/taosdata/TDengine into feature/data_format
This commit is contained in:
commit
e2003534f4
|
@ -17,6 +17,7 @@ serverName="taosd"
|
||||||
clientName="taos"
|
clientName="taos"
|
||||||
uninstallScript="rmtaos"
|
uninstallScript="rmtaos"
|
||||||
configFile="taos.cfg"
|
configFile="taos.cfg"
|
||||||
|
tarName="taos.tar.gz"
|
||||||
|
|
||||||
osType=Linux
|
osType=Linux
|
||||||
pagMode=full
|
pagMode=full
|
||||||
|
@ -242,6 +243,11 @@ function install_examples() {
|
||||||
|
|
||||||
function update_TDengine() {
|
function update_TDengine() {
|
||||||
# Start to update
|
# Start to update
|
||||||
|
if [ ! -e ${tarName} ]; then
|
||||||
|
echo "File ${tarName} does not exist"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
tar -zxf ${tarName}
|
||||||
echo -e "${GREEN}Start to update ${productName} client...${NC}"
|
echo -e "${GREEN}Start to update ${productName} client...${NC}"
|
||||||
# Stop the client shell if running
|
# Stop the client shell if running
|
||||||
if pidof ${clientName} &> /dev/null; then
|
if pidof ${clientName} &> /dev/null; then
|
||||||
|
@ -264,42 +270,49 @@ function update_TDengine() {
|
||||||
|
|
||||||
echo
|
echo
|
||||||
echo -e "\033[44;32;1m${productName} client is updated successfully!${NC}"
|
echo -e "\033[44;32;1m${productName} client is updated successfully!${NC}"
|
||||||
|
|
||||||
|
rm -rf $(tar -tf ${tarName})
|
||||||
}
|
}
|
||||||
|
|
||||||
function install_TDengine() {
|
function install_TDengine() {
|
||||||
# Start to install
|
# Start to install
|
||||||
echo -e "${GREEN}Start to install ${productName} client...${NC}"
|
if [ ! -e ${tarName} ]; then
|
||||||
|
echo "File ${tarName} does not exist"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
tar -zxf ${tarName}
|
||||||
|
echo -e "${GREEN}Start to install ${productName} client...${NC}"
|
||||||
|
|
||||||
install_main_path
|
install_main_path
|
||||||
install_log
|
install_log
|
||||||
install_header
|
install_header
|
||||||
install_lib
|
install_lib
|
||||||
install_jemalloc
|
install_jemalloc
|
||||||
if [ "$verMode" == "cluster" ]; then
|
if [ "$verMode" == "cluster" ]; then
|
||||||
install_connector
|
install_connector
|
||||||
fi
|
fi
|
||||||
install_examples
|
install_examples
|
||||||
install_bin
|
install_bin
|
||||||
install_config
|
install_config
|
||||||
|
|
||||||
echo
|
echo
|
||||||
echo -e "\033[44;32;1m${productName} client is installed successfully!${NC}"
|
echo -e "\033[44;32;1m${productName} client is installed successfully!${NC}"
|
||||||
|
|
||||||
rm -rf $(tar -tf ${tarName})
|
rm -rf $(tar -tf ${tarName})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
## ==============================Main program starts from here============================
|
## ==============================Main program starts from here============================
|
||||||
# Install or updata client and client
|
# Install or updata client and client
|
||||||
# if server is already install, don't install client
|
# if server is already install, don't install client
|
||||||
if [ -e ${bin_dir}/${serverName} ]; then
|
if [ -e ${bin_dir}/${serverName} ]; then
|
||||||
echo -e "\033[44;32;1mThere are already installed ${productName} server, so don't need install client!${NC}"
|
echo -e "\033[44;32;1mThere are already installed ${productName} server, so don't need install client!${NC}"
|
||||||
exit 0
|
exit 0
|
||||||
fi
|
fi
|
||||||
|
|
||||||
if [ -x ${bin_dir}/${clientName} ]; then
|
if [ -x ${bin_dir}/${clientName} ]; then
|
||||||
update_flag=1
|
update_flag=1
|
||||||
update_TDengine
|
update_TDengine
|
||||||
else
|
else
|
||||||
install_TDengine
|
install_TDengine
|
||||||
fi
|
fi
|
||||||
|
|
|
@ -138,7 +138,7 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
|
||||||
pCfg->dbId = pCreate->dbUid;
|
pCfg->dbId = pCreate->dbUid;
|
||||||
pCfg->szPage = pCreate->pageSize * 1024;
|
pCfg->szPage = pCreate->pageSize * 1024;
|
||||||
pCfg->szCache = pCreate->pages;
|
pCfg->szCache = pCreate->pages;
|
||||||
pCfg->szBuf = pCreate->buffer * 1024 * 1024;
|
pCfg->szBuf = (uint64_t)pCreate->buffer * 1024 * 1024;
|
||||||
pCfg->isWeak = true;
|
pCfg->isWeak = true;
|
||||||
pCfg->tsdbCfg.compression = pCreate->compression;
|
pCfg->tsdbCfg.compression = pCreate->compression;
|
||||||
pCfg->tsdbCfg.precision = pCreate->precision;
|
pCfg->tsdbCfg.precision = pCreate->precision;
|
||||||
|
|
|
@ -419,6 +419,8 @@ static int32_t mndProcessSubscribeReq(SRpcMsg *pMsg) {
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
|
topicObj.refConsumerCnt = pTopic->refConsumerCnt + 1;
|
||||||
|
mInfo("subscribe topic %s by consumer %ld cgroup %s, refcnt %d", pTopic->name, consumerId, cgroup,
|
||||||
|
topicObj.refConsumerCnt);
|
||||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto SUBSCRIBE_OVER;
|
||||||
|
|
||||||
mndReleaseTopic(pMnode, pTopic);
|
mndReleaseTopic(pMnode, pTopic);
|
||||||
|
|
|
@ -417,7 +417,7 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
|
|
||||||
// 2. redo log: subscribe and vg assignment
|
// 2. redo log: subscribe and vg assignment
|
||||||
// subscribe
|
// subscribe
|
||||||
if (mndSetSubRedoLogs(pMnode, pTrans, pOutput->pSub) != 0) {
|
if (mndSetSubCommitLogs(pMnode, pTrans, pOutput->pSub) != 0) {
|
||||||
goto REB_FAIL;
|
goto REB_FAIL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -479,6 +479,10 @@ static int32_t mndPersistRebResult(SMnode *pMnode, SRpcMsg *pMsg, const SMqRebOu
|
||||||
SMqTopicObj topicObj = {0};
|
SMqTopicObj topicObj = {0};
|
||||||
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
memcpy(&topicObj, pTopic, sizeof(SMqTopicObj));
|
||||||
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
|
topicObj.refConsumerCnt = pTopic->refConsumerCnt - consumerNum;
|
||||||
|
// TODO is that correct?
|
||||||
|
pTopic->refConsumerCnt = topicObj.refConsumerCnt;
|
||||||
|
mInfo("subscribe topic %s unref %d consumer cgroup %s, refcnt %d", pTopic->name, consumerNum, cgroup,
|
||||||
|
topicObj.refConsumerCnt);
|
||||||
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
|
if (mndSetTopicCommitLogs(pMnode, pTrans, &topicObj) != 0) goto REB_FAIL;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -93,7 +93,7 @@ class TDTestCase:
|
||||||
tdLog.info(shellCmd)
|
tdLog.info(shellCmd)
|
||||||
os.system(shellCmd)
|
os.system(shellCmd)
|
||||||
|
|
||||||
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum,rowsPerTbl):
|
def create_tables(self,tsql, dbName,vgroups,stbName,ctbNum):
|
||||||
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
tsql.execute("create database if not exists %s vgroups %d"%(dbName, vgroups))
|
||||||
tsql.execute("use %s" %dbName)
|
tsql.execute("use %s" %dbName)
|
||||||
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
tsql.execute("create table if not exists %s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%stbName)
|
||||||
|
@ -147,8 +147,7 @@ class TDTestCase:
|
||||||
parameterDict["dbName"],\
|
parameterDict["dbName"],\
|
||||||
parameterDict["vgroups"],\
|
parameterDict["vgroups"],\
|
||||||
parameterDict["stbName"],\
|
parameterDict["stbName"],\
|
||||||
parameterDict["ctbNum"],\
|
parameterDict["ctbNum"])
|
||||||
parameterDict["rowsPerTbl"])
|
|
||||||
|
|
||||||
self.insert_data(tsql,\
|
self.insert_data(tsql,\
|
||||||
parameterDict["dbName"],\
|
parameterDict["dbName"],\
|
||||||
|
@ -322,6 +321,75 @@ class TDTestCase:
|
||||||
|
|
||||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||||
|
|
||||||
|
def tmqCase2a(self, cfgPath, buildPath):
|
||||||
|
tdLog.printNoPrefix("======== test case 2a: Produce while two consumers to subscribe one db, inclue 1 stb")
|
||||||
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
|
# create and start thread
|
||||||
|
parameterDict = {'cfg': '', \
|
||||||
|
'dbName': 'db2a', \
|
||||||
|
'vgroups': 4, \
|
||||||
|
'stbName': 'stb1', \
|
||||||
|
'ctbNum': 10, \
|
||||||
|
'rowsPerTbl': 10000, \
|
||||||
|
'batchNum': 100, \
|
||||||
|
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||||
|
parameterDict['cfg'] = cfgPath
|
||||||
|
|
||||||
|
self.initConsumerTable()
|
||||||
|
|
||||||
|
tdSql.execute("create database if not exists %s vgroups %d" %(parameterDict['dbName'], parameterDict['vgroups']))
|
||||||
|
tdSql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(parameterDict['dbName'], parameterDict['stbName']))
|
||||||
|
|
||||||
|
tdLog.info("create topics from db")
|
||||||
|
topicName1 = 'topic_db1'
|
||||||
|
|
||||||
|
tdSql.execute("create topic %s as %s" %(topicName1, parameterDict['dbName']))
|
||||||
|
|
||||||
|
consumerId = 0
|
||||||
|
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||||
|
topicList = topicName1
|
||||||
|
ifcheckdata = 0
|
||||||
|
ifManualCommit = 1
|
||||||
|
keyList = 'group.id:cgrp1,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
consumerId = 1
|
||||||
|
keyList = 'group.id:cgrp2,\
|
||||||
|
enable.auto.commit:false,\
|
||||||
|
auto.commit.interval.ms:6000,\
|
||||||
|
auto.offset.reset:earliest'
|
||||||
|
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||||
|
|
||||||
|
tdLog.info("start consume processor")
|
||||||
|
pollDelay = 10
|
||||||
|
showMsg = 1
|
||||||
|
showRow = 1
|
||||||
|
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||||
|
|
||||||
|
prepareEnvThread = threading.Thread(target=self.prepareEnv, kwargs=parameterDict)
|
||||||
|
prepareEnvThread.start()
|
||||||
|
|
||||||
|
# wait for data ready
|
||||||
|
prepareEnvThread.join()
|
||||||
|
|
||||||
|
tdLog.info("insert process end, and start to check consume result")
|
||||||
|
expectRows = 2
|
||||||
|
resultList = self.selectConsumeResult(expectRows)
|
||||||
|
totalConsumeRows = 0
|
||||||
|
for i in range(expectRows):
|
||||||
|
totalConsumeRows += resultList[i]
|
||||||
|
|
||||||
|
if totalConsumeRows != expectrowcnt * 2:
|
||||||
|
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt*2))
|
||||||
|
tdLog.exit("tmq consume rows error!")
|
||||||
|
|
||||||
|
tdSql.query("drop topic %s"%topicName1)
|
||||||
|
|
||||||
|
tdLog.printNoPrefix("======== test case 2a end ...... ")
|
||||||
|
|
||||||
def tmqCase3(self, cfgPath, buildPath):
|
def tmqCase3(self, cfgPath, buildPath):
|
||||||
tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb")
|
tdLog.printNoPrefix("======== test case 3: Produce while one consumers to subscribe one db, include 2 stb")
|
||||||
tdLog.info("step 1: create database, stb, ctb and insert data")
|
tdLog.info("step 1: create database, stb, ctb and insert data")
|
||||||
|
@ -745,6 +813,7 @@ class TDTestCase:
|
||||||
|
|
||||||
self.tmqCase1(cfgPath, buildPath)
|
self.tmqCase1(cfgPath, buildPath)
|
||||||
self.tmqCase2(cfgPath, buildPath)
|
self.tmqCase2(cfgPath, buildPath)
|
||||||
|
self.tmqCase2a(cfgPath, buildPath)
|
||||||
self.tmqCase3(cfgPath, buildPath)
|
self.tmqCase3(cfgPath, buildPath)
|
||||||
self.tmqCase4(cfgPath, buildPath)
|
self.tmqCase4(cfgPath, buildPath)
|
||||||
self.tmqCase5(cfgPath, buildPath)
|
self.tmqCase5(cfgPath, buildPath)
|
||||||
|
|
Loading…
Reference in New Issue