Merge branch 'main' into fix/liaohj_main
This commit is contained in:
commit
8a6b07347c
|
@ -225,6 +225,9 @@ DLL_EXPORT int taos_get_tables_vgId(TAOS *taos, const char *db, const char *tabl
|
|||
|
||||
DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList);
|
||||
|
||||
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
|
||||
DLL_EXPORT void taos_set_hb_quit(int8_t quitByKill);
|
||||
|
||||
/* --------------------------schemaless INTERFACE------------------------------- */
|
||||
|
||||
DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision);
|
||||
|
|
|
@ -42,8 +42,9 @@ if [ "$DISABLE_ADAPTER" = "0" ]; then
|
|||
done
|
||||
fi
|
||||
|
||||
# if has mnode ep set or the host is first ep or not for cluster, just start.
|
||||
if [ -f "$DATA_DIR/dnode/mnodeEpSet.json" ] ||
|
||||
# if dnode has been created or has mnode ep set or the host is first ep or not for cluster, just start.
|
||||
if [ -f "$DATA_DIR/dnode/dnode.json" ] ||
|
||||
[ -f "$DATA_DIR/dnode/mnodeEpSet.json" ] ||
|
||||
[ "$TAOS_FQDN" = "$FIRST_EP_HOST" ]; then
|
||||
$@
|
||||
# others will first wait the first ep ready.
|
||||
|
|
|
@ -80,6 +80,7 @@ typedef struct {
|
|||
int64_t appId;
|
||||
// ctl
|
||||
int8_t threadStop;
|
||||
int8_t quitByKill;
|
||||
TdThread thread;
|
||||
TdThreadMutex lock; // used when app init and cleanup
|
||||
SHashObj* appSummary;
|
||||
|
|
|
@ -845,7 +845,12 @@ static void hbStopThread() {
|
|||
return;
|
||||
}
|
||||
|
||||
taosThreadJoin(clientHbMgr.thread, NULL);
|
||||
// thread quit mode kill or inner exit from self-thread
|
||||
if (clientHbMgr.quitByKill) {
|
||||
taosThreadKill(clientHbMgr.thread, 0);
|
||||
} else {
|
||||
taosThreadJoin(clientHbMgr.thread, NULL);
|
||||
}
|
||||
|
||||
tscDebug("hb thread stopped");
|
||||
}
|
||||
|
@ -1037,3 +1042,8 @@ void hbDeregisterConn(SAppHbMgr *pAppHbMgr, SClientHbKey connKey) {
|
|||
|
||||
atomic_sub_fetch_32(&pAppHbMgr->connKeyCnt, 1);
|
||||
}
|
||||
|
||||
// set heart beat thread quit mode , if quicByKill 1 then kill thread else quit from inner
|
||||
void taos_set_hb_quit(int8_t quitByKill) {
|
||||
clientHbMgr.quitByKill = quitByKill;
|
||||
}
|
||||
|
|
|
@ -2503,9 +2503,11 @@ _exit:
|
|||
int32_t tColDataAddValueByBind(SColData *pColData, TAOS_MULTI_BIND *pBind) {
|
||||
int32_t code = 0;
|
||||
|
||||
ASSERT(pColData->type == pBind->buffer_type);
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pBind->buffer_type)) { // var-length data type
|
||||
if (!(pBind->num == 1 && pBind->is_null && *pBind->is_null)) {
|
||||
ASSERT(pColData->type == pBind->buffer_type);
|
||||
}
|
||||
|
||||
if (IS_VAR_DATA_TYPE(pColData->type)) { // var-length data type
|
||||
for (int32_t i = 0; i < pBind->num; ++i) {
|
||||
if (pBind->is_null && pBind->is_null[i]) {
|
||||
code = tColDataAppendValueImpl[pColData->flag][CV_FLAG_NULL](pColData, NULL, 0);
|
||||
|
|
|
@ -1437,7 +1437,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) {
|
||||
static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SVgObj *pVgroup = NULL;
|
||||
void *pIter = NULL;
|
||||
|
@ -1459,7 +1459,7 @@ static int32_t mndTrimDb(SMnode *pMnode, SDbObj *pDb, SRpcMsg *pReq) {
|
|||
pHead->vgId = htonl(pVgroup->vgId);
|
||||
tSerializeSVTrimDbReq((char *)pHead + sizeof(SMsgHead), contLen, &trimReq);
|
||||
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen, .info = pReq->info};
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_VND_TRIM, .pCont = pHead, .contLen = contLen};
|
||||
SEpSet epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
|
@ -1495,7 +1495,7 @@ static int32_t mndProcessTrimDbReq(SRpcMsg *pReq) {
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
code = mndTrimDb(pMnode, pDb, pReq);
|
||||
code = mndTrimDb(pMnode, pDb);
|
||||
|
||||
_OVER:
|
||||
if (code != 0) {
|
||||
|
|
|
@ -251,7 +251,7 @@ int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, in
|
|||
goto _return;
|
||||
}
|
||||
|
||||
if (bind[c].buffer_type != pColSchema->type) {
|
||||
if ((!(rowNum == 1 && bind[c].is_null && *bind[c].is_null)) && bind[c].buffer_type != pColSchema->type) { // for rowNum ==1 , connector may not set buffer_type
|
||||
code = buildInvalidOperationMsg(&pBuf, "column type mis-match with buffer type");
|
||||
goto _return;
|
||||
}
|
||||
|
|
|
@ -692,7 +692,7 @@ SToken tStrGetToken(const char* str, int32_t* i, bool isPrevOptr, bool* pIgnoreC
|
|||
len = tGetToken(&str[*i + t0.n + 1], &type);
|
||||
|
||||
// only id and string are valid
|
||||
if ((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) {
|
||||
if (((TK_NK_STRING != t0.type) && (TK_NK_ID != t0.type)) || ((TK_NK_STRING != type) && (TK_NK_ID != type))) {
|
||||
t0.type = TK_NK_ILLEGAL;
|
||||
t0.n = 0;
|
||||
|
||||
|
|
|
@ -68,8 +68,8 @@ docker run \
|
|||
-v ${REP_REAL_PATH}/community/contrib/libuv/:${REP_DIR}/community/contrib/libuv \
|
||||
-v ${REP_REAL_PATH}/community/contrib/lz4/:${REP_DIR}/community/contrib/lz4 \
|
||||
-v ${REP_REAL_PATH}/community/contrib/zlib/:${REP_DIR}/community/contrib/zlib \
|
||||
-v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
||||
--rm --ulimit core=-1 taos_test:v1.0 sh -c "pip uninstall taospy -y;pip3 install taospy==2.7.2;cd $REP_DIR;rm -rf debug;mkdir -p debug;cd debug;cmake .. -DBUILD_HTTP=false -DBUILD_TOOLS=true -DBUILD_TEST=true -DWEBSOCKET=true -DBUILD_TAOSX=true -DJEMALLOC_ENABLED=true;make -j || exit 1"
|
||||
# -v ${REP_REAL_PATH}/community/contrib/jemalloc/:${REP_DIR}/community/contrib/jemalloc \
|
||||
|
||||
if [[ -d ${WORKDIR}/debugNoSan ]] ;then
|
||||
echo "delete ${WORKDIR}/debugNoSan"
|
||||
|
|
|
@ -0,0 +1,71 @@
|
|||
drop database if exists d0;
|
||||
create database d0 replica 1 keep 365 minRows 100 maxRows 4096 comp 2 vgroups 2 precision 'ms';
|
||||
use d0;
|
||||
create table if not exists almlog (starttime timestamp,endtime timestamp,durationtime int, alarmno int, alarmtext nchar(256),isactive nchar(64)) tags (mcid nchar(16));
|
||||
create table if not exists mplog (starttime timestamp,mpid int, paravalue nchar(256),mptype nchar(32)) tags (mcid nchar(16));
|
||||
create table if not exists mdlog (starttime timestamp,endtime timestamp,durationtime int, statuscode int, npcgmname nchar(256),attr int) tags (mcid nchar(16));
|
||||
create table if not exists nrglog (updatetime timestamp,energyvalue double,enerygyincrease double) tags (mcid nchar(16),enerygytype nchar(16));
|
||||
|
||||
create table almlog_m201 using almlog tags("m201");
|
||||
create table almlog_m0103 using almlog tags("m0103");
|
||||
create table almlog_m0103_20031 using almlog tags("m0103");
|
||||
create table almlog_m0103_20032 using almlog tags("m0103");
|
||||
create table almlog_m0103_3003 using almlog tags("m0103");
|
||||
create table almlog_m0103_20033 using almlog tags("m0103");
|
||||
create table almlog_m0103_30031 using almlog tags("m0103");
|
||||
create table almlog_m0201 using almlog tags("m0201");
|
||||
create table almlog_m0102 using almlog tags("m0102");
|
||||
create table almlog_m0101 using almlog tags("m0101");
|
||||
create table almlog_m1002 using almlog tags("m1002");
|
||||
|
||||
create table mplog_m0204_4 using mplog tags("m0204");
|
||||
create table mplog_m0204_5 using mplog tags("m0204");
|
||||
create table mplog_m0204_6 using mplog tags("m0204");
|
||||
create table mplog_m0204_12 using mplog tags("m0204");
|
||||
create table mplog_m0204 using mplog tags("m0204");
|
||||
create table mplog_m201 using mplog tags("m201");
|
||||
create table mplog_m0102 using mplog tags("m0102");
|
||||
create table mplog_m1101 using mplog tags("m1101");
|
||||
|
||||
create table mdlog_m0102 using mplog tags("m0102");
|
||||
create table mdlog_m0504 using mplog tags("m0504");
|
||||
create table mdlog_m0505 using mplog tags("m0505");
|
||||
create table mdlog_m0507 using mplog tags("m0507");
|
||||
create table mdlog_m1002 using mplog tags("m1002");
|
||||
create table mdlog_m3201 using mplog tags("m3201");
|
||||
create table mdlog_m0201 using mplog tags("m0201");
|
||||
create table mdlog_m1102 using mplog tags("m1102");
|
||||
create table mdlog_m201 using mplog tags("m201");
|
||||
|
||||
create table nrglog_m201_electricvalue1 using nrglog tags("m201","electricValue1");
|
||||
create table nrglog_m201_oilvalue1 using nrglog tags("m201","oilValue1");
|
||||
create table nrglog_m201_gasvalue1 using nrglog tags("m201","gasValue1");
|
||||
create table nrglog_m201_watervalue1 using nrglog tags("m201","waterValue1");
|
||||
create table nrglog_m0101_oilvalue1 using nrglog tags("m0101","oilValue1");
|
||||
create table nrglog_m0101_watervalue1 using nrglog tags("m0101","waterValue1");
|
||||
create table nrglog_m0102_gasvalue1 using nrglog tags("m0102","gasValue1");
|
||||
create table nrglog_m1903 using nrglog tags("m1903",NULL);
|
||||
create table nrglog_m2802 using nrglog tags("m2802",NULL);
|
||||
create table nrglog_m2101 using nrglog tags("m2101",NULL);
|
||||
create table nrglog_m0102 using nrglog tags("m0102",NULL);
|
||||
create table nrglog_m0101_electricvalue1 using nrglog tags("m0101","electricValue1");
|
||||
create table nrglog_m0101_gasvalue1 using nrglog tags("m0101","gasValue1");
|
||||
create table nrglog_m0102_electricvalue1 using nrglog tags("m0102","electricValue1");
|
||||
create table nrglog_m0102_oilvalue1 using nrglog tags("m0102","oilValue1");
|
||||
create table nrglog_m0102_watervalue1 using nrglog tags("m0102","waterValue1");
|
||||
|
||||
|
||||
insert into almlog_m0103 values(now,now+1s,10,0,'','dismissed');
|
||||
insert into almlog_m0103_20031 values(now,now+1s,10,20031,'','dismissed');
|
||||
insert into almlog_m0103_20032 values(now,now+1s,10,20032,'','dismissed');
|
||||
insert into almlog_m0103_3003 values(now,now+1s,10,3003,'','dismissed');
|
||||
insert into almlog_m0103_20033 values(now,now+1s,10,20033,'','dismissed');
|
||||
insert into almlog_m0103_30031 values(now,now+1s,10,30031,'','dismissed');
|
||||
|
||||
flush database d0;
|
||||
|
||||
show table tags from almlog;
|
||||
|
||||
select *,tbname from d0.almlog where mcid='m0103';
|
||||
|
||||
select table_name from information_schema.ins_tables where db_name='d0';
|
|
@ -53,18 +53,8 @@
|
|||
"sample_format": "csv",
|
||||
"sample_file": "./sample.csv",
|
||||
"tags_file": "",
|
||||
"columns": [
|
||||
{
|
||||
"type": "INT",
|
||||
"count": 4094
|
||||
}
|
||||
],
|
||||
"tags": [
|
||||
{
|
||||
"type": "TINYINT",
|
||||
"count": 1
|
||||
}
|
||||
]
|
||||
"columns": [{ "type": "INT","count": 4093}],
|
||||
"tags": [{"type": "TINYINT", "count": 1},{"type": "NCHAR","count": 1}]
|
||||
}
|
||||
]
|
||||
}
|
||||
|
|
|
@ -98,9 +98,12 @@ class TDTestCase:
|
|||
|
||||
def buildTaosd(self,bPath):
|
||||
# os.system(f"mv {bPath}/build_bak {bPath}/build ")
|
||||
os.system(f" cd {bPath} ")
|
||||
|
||||
os.system(f" cd {bPath} ")
|
||||
|
||||
def is_list_same_as_ordered_list(self,unordered_list, ordered_list):
|
||||
sorted_list = sorted(unordered_list)
|
||||
return sorted_list == ordered_list
|
||||
|
||||
def run(self):
|
||||
scriptsPath = os.path.dirname(os.path.realpath(__file__))
|
||||
distro_id = distro.id()
|
||||
|
@ -146,6 +149,8 @@ class TDTestCase:
|
|||
tdLog.info(" LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y ")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taosBenchmark -f 0-others/compa4096.json -y")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -s 'flush database db4096 '")
|
||||
os.system("LD_LIBRARY_PATH=/usr/lib taos -f 0-others/TS-3131.tsql")
|
||||
|
||||
cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
|
||||
if os.system(cmd) == 0:
|
||||
raise Exception("failed to execute system command. cmd: %s" % cmd)
|
||||
|
@ -220,6 +225,17 @@ class TDTestCase:
|
|||
tdLog.exit("%s(%d) failed" % args)
|
||||
tdsql.query("show streams;")
|
||||
tdsql.checkRows(2)
|
||||
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
|
||||
tdsql.checkRows(6)
|
||||
expectList = [0,3003,20031,20032,20033,30031]
|
||||
resultList = []
|
||||
for i in range(6):
|
||||
resultList.append(tdsql.queryResult[i][3])
|
||||
print(resultList)
|
||||
if self.is_list_same_as_ordered_list(resultList,expectList):
|
||||
print("The unordered list is the same as the ordered list.")
|
||||
else:
|
||||
tdlog.error("The unordered list is not the same as the ordered list.")
|
||||
tdsql.execute("insert into test.d80 values (now+1s, 11, 103, 0.21);")
|
||||
tdsql.execute("insert into test.d9 values (now+5s, 4.3, 104, 0.4);")
|
||||
|
||||
|
|
|
@ -29,6 +29,9 @@ class TDTestCase:
|
|||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
self.dbname = 'db_test'
|
||||
self.ns_dbname = 'ns_test'
|
||||
self.us_dbname = 'us_test'
|
||||
self.ms_dbname = 'ms_test'
|
||||
self.setsql = TDSetSql()
|
||||
self.stbname = 'stb'
|
||||
self.ntbname = 'ntb'
|
||||
|
@ -220,11 +223,45 @@ class TDTestCase:
|
|||
tdSql.query(f'select {func}(*) from {self.stbname}')
|
||||
tdSql.execute(f'drop table {self.stbname}')
|
||||
tdSql.execute(f'drop database {self.dbname}')
|
||||
|
||||
def precision_now_check(self):
|
||||
for dbname in [self.ms_dbname, self.us_dbname, self.ns_dbname]:
|
||||
self.ts = 1537146000000
|
||||
if dbname == self.us_dbname:
|
||||
self.ts = int(self.ts*1000)
|
||||
precision = "us"
|
||||
elif dbname == self.ns_dbname:
|
||||
precision = "ns"
|
||||
self.ts = int(self.ts*1000000)
|
||||
else:
|
||||
precision = "ms"
|
||||
self.ts = int(self.ts)
|
||||
tdSql.execute(f'drop database if exists {dbname}')
|
||||
tdSql.execute(f'create database if not exists {dbname} precision "{precision}"')
|
||||
tdSql.execute(f'use {dbname}')
|
||||
self.base_data = {
|
||||
'tinyint':self.tinyint_val
|
||||
}
|
||||
self.column_dict = {
|
||||
'col1': 'tinyint'
|
||||
}
|
||||
for col_name,col_type in self.column_dict.items():
|
||||
tdSql.execute(f'create table if not exists {self.stbname} (ts timestamp,{col_name} {col_type}) tags(t1 int)')
|
||||
for i in range(self.tbnum):
|
||||
tdSql.execute(f'create table if not exists {self.stbname}_{i} using {self.stbname} tags(1)')
|
||||
self.insert_base_data(col_type,f'{self.stbname}_{i}',self.rowNum,self.base_data)
|
||||
tdSql.query(f'select * from {self.stbname}')
|
||||
tdSql.checkEqual(tdSql.queryRows, self.tbnum*self.rowNum)
|
||||
tdSql.execute(f'delete from {self.stbname} where ts < now()')
|
||||
tdSql.query(f'select * from {self.stbname}')
|
||||
tdSql.checkEqual(tdSql.queryRows, 0)
|
||||
|
||||
def run(self):
|
||||
self.delete_data_stb()
|
||||
tdDnodes.stoptaosd(1)
|
||||
tdDnodes.starttaosd(1)
|
||||
self.delete_data_stb()
|
||||
self.precision_now_check()
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
|
|
@ -42,7 +42,7 @@ class TDTestCase:
|
|||
'showRow': 1}
|
||||
|
||||
topicNameList = ['topic1', 'topic2']
|
||||
expectRowsList = []
|
||||
queryRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
tdCom.create_database(tdSql, paraDict["dbName"],paraDict["dropFlag"], vgroups=4,replica=1)
|
||||
tdSql.execute("alter database %s wal_retention_period 3600" % (paraDict['dbName']))
|
||||
|
@ -60,7 +60,7 @@ class TDTestCase:
|
|||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
queryRowsList.append(tdSql.getRows())
|
||||
|
||||
# create one stb2
|
||||
paraDict["stbName"] = 'stb2'
|
||||
|
@ -77,7 +77,7 @@ class TDTestCase:
|
|||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
# tdSql.query(queryString)
|
||||
# expectRowsList.append(tdSql.getRows())
|
||||
# queryRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
|
@ -99,7 +99,8 @@ class TDTestCase:
|
|||
pThread = tmqCom.asyncInsertData(paraDict)
|
||||
|
||||
tdLog.info("wait consumer commit notify")
|
||||
tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
|
||||
# tmqCom.getStartCommitNotifyFromTmqsim(rows=4)
|
||||
tmqCom.getStartConsumeNotifyFromTmqsim(rows=2)
|
||||
|
||||
tdLog.info("pkill one consume processor")
|
||||
tmqCom.stopTmqSimProcess('tmq_sim_new')
|
||||
|
@ -109,19 +110,21 @@ class TDTestCase:
|
|||
tdLog.info("wait the consume result")
|
||||
expectRows = 2
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actTotalRows = 0
|
||||
actConsumTotalRows = 0
|
||||
for i in range(len(resultList)):
|
||||
actTotalRows += resultList[i]
|
||||
actConsumTotalRows += resultList[i]
|
||||
|
||||
tdLog.info("act consumer1 rows: %d, consumer2 rows: %d"%(resultList[0], resultList[1]))
|
||||
|
||||
tdSql.query(queryString)
|
||||
expectRowsList.append(tdSql.getRows())
|
||||
expectTotalRows = 0
|
||||
for i in range(len(expectRowsList)):
|
||||
expectTotalRows += expectRowsList[i]
|
||||
queryRowsList.append(tdSql.getRows())
|
||||
queryTotalRows = 0
|
||||
for i in range(len(queryRowsList)):
|
||||
queryTotalRows += queryRowsList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(actTotalRows, expectTotalRows))
|
||||
if expectTotalRows <= resultList[0]:
|
||||
tdLog.info("act consume rows: %d should >= expect consume rows: %d"%(actTotalRows, expectTotalRows))
|
||||
tdLog.info("act consume rows: %d, query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
|
||||
if actConsumTotalRows < queryTotalRows:
|
||||
tdLog.info("act consume rows: %d should >= query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
||||
# time.sleep(10)
|
||||
|
@ -130,9 +133,95 @@ class TDTestCase:
|
|||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
|
||||
def tmqCase2(self):
|
||||
tdLog.printNoPrefix("======== test case 2: ")
|
||||
paraDict = {'dbName': 'db1',
|
||||
'dropFlag': 1,
|
||||
'event': '',
|
||||
'vgroups': 4,
|
||||
'stbName': 'stb',
|
||||
'colPrefix': 'c',
|
||||
'tagPrefix': 't',
|
||||
'colSchema': [{'type': 'INT', 'count':2}, {'type': 'binary', 'len':20, 'count':1},{'type': 'TIMESTAMP', 'count':1}],
|
||||
'tagSchema': [{'type': 'INT', 'count':1}, {'type': 'binary', 'len':20, 'count':1}],
|
||||
'ctbPrefix': 'ctb',
|
||||
'ctbNum': 10,
|
||||
'rowsPerTbl': 1000,
|
||||
'batchNum': 10,
|
||||
'startTs': 1640966400000, # 2022-01-01 00:00:00.000
|
||||
'pollDelay': 20,
|
||||
'showMsg': 1,
|
||||
'showRow': 1}
|
||||
|
||||
topicNameList = ['topic3', 'topic4']
|
||||
queryRowsList = []
|
||||
tmqCom.initConsumerTable()
|
||||
|
||||
tdLog.info("create topics from stb with filter")
|
||||
# queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
||||
queryString = "select ts, log(c1), ceil(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[0], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
queryRowsList.append(tdSql.getRows())
|
||||
|
||||
# create one stb2
|
||||
paraDict["stbName"] = 'stb2'
|
||||
# queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s where c1 %% 7 == 0" %(paraDict['dbName'], paraDict['stbName'])
|
||||
queryString = "select ts, sin(c1), abs(pow(c1,3)) from %s.%s" %(paraDict['dbName'], paraDict['stbName'])
|
||||
sqlString = "create topic %s as %s" %(topicNameList[1], queryString)
|
||||
tdLog.info("create topic sql: %s"%sqlString)
|
||||
tdSql.execute(sqlString)
|
||||
tdSql.query(queryString)
|
||||
queryRowsList.append(tdSql.getRows())
|
||||
|
||||
# init consume info, and start tmq_sim, then check consume result
|
||||
tdLog.info("insert consume info to consume processor")
|
||||
consumerId = 0
|
||||
paraDict["rowsPerTbl"] = 5000
|
||||
expectrowcnt = paraDict["rowsPerTbl"] * paraDict["ctbNum"] * 2
|
||||
topicList = "%s,%s"%(topicNameList[0],topicNameList[1])
|
||||
ifcheckdata = 1
|
||||
ifManualCommit = 1
|
||||
keyList = 'group.id:cgrp1, enable.auto.commit:true, auto.commit.interval.ms:3000, auto.offset.reset:earliest'
|
||||
tmqCom.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor 1")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'])
|
||||
|
||||
tdLog.info("start consume processor 2")
|
||||
tmqCom.startTmqSimProcess(paraDict['pollDelay'],paraDict["dbName"],paraDict['showMsg'], paraDict['showRow'],'cdb',0,1)
|
||||
|
||||
tdLog.info("wait the consume result")
|
||||
expectRows = 2
|
||||
resultList = tmqCom.selectConsumeResult(expectRows)
|
||||
actConsumTotalRows = 0
|
||||
for i in range(len(resultList)):
|
||||
actConsumTotalRows += resultList[i]
|
||||
|
||||
tdLog.info("act consumer1 rows: %d, consumer2 rows: %d"%(resultList[0], resultList[1]))
|
||||
|
||||
queryTotalRows = 0
|
||||
for i in range(len(queryRowsList)):
|
||||
queryTotalRows += queryRowsList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
|
||||
if actConsumTotalRows < queryTotalRows:
|
||||
tdLog.info("act consume rows: %d should >= query consume rows: %d"%(actConsumTotalRows, queryTotalRows))
|
||||
tdLog.exit("0 tmq consume rows error!")
|
||||
|
||||
# time.sleep(10)
|
||||
# for i in range(len(topicNameList)):
|
||||
# tdSql.query("drop topic %s"%topicNameList[i])
|
||||
|
||||
tdLog.printNoPrefix("======== test case 2 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
self.tmqCase1()
|
||||
self.tmqCase2()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
|
|
|
@ -58,7 +58,6 @@ int32_t shellRunSingleCommand(char *command) {
|
|||
}
|
||||
|
||||
if (shellRegexMatch(command, "^[ \t]*(quit|q|exit)[ \t;]*$", REG_EXTENDED | REG_ICASE)) {
|
||||
shellWriteHistory();
|
||||
return -1;
|
||||
}
|
||||
|
||||
|
@ -887,7 +886,6 @@ void shellWriteHistory() {
|
|||
}
|
||||
i = (i + 1) % SHELL_MAX_HISTORY_SIZE;
|
||||
}
|
||||
taosFsyncFile(pFile);
|
||||
taosCloseFile(&pFile);
|
||||
}
|
||||
|
||||
|
|
|
@ -83,6 +83,9 @@ int main(int argc, char *argv[]) {
|
|||
#endif
|
||||
taos_init();
|
||||
|
||||
// kill heart-beat thread when quit
|
||||
taos_set_hb_quit(1);
|
||||
|
||||
if (shell.args.is_dump_config) {
|
||||
shellDumpConfig();
|
||||
taos_cleanup();
|
||||
|
|
Loading…
Reference in New Issue