Merge remote-tracking branch 'origin/main' into fix/TD-21663-2
This commit is contained in:
commit
3973e0512d
|
@ -15,13 +15,13 @@
|
|||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "mndMnode.h"
|
||||
#include "mndCluster.h"
|
||||
#include "mndDnode.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
#include "mndSync.h"
|
||||
#include "mndTrans.h"
|
||||
#include "tmisce.h"
|
||||
#include "mndCluster.h"
|
||||
|
||||
#define MNODE_VER_NUMBER 1
|
||||
#define MNODE_RESERVE_SIZE 64
|
||||
|
@ -181,9 +181,8 @@ _OVER:
|
|||
|
||||
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
|
||||
mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
|
||||
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
|
||||
pObj->pDnode = sdbAcquireNotReadyObj(pSdb, SDB_DNODE, &pObj->id);
|
||||
if (pObj->pDnode == NULL) {
|
||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
mError("mnode:%d, failed to perform insert action since %s", pObj->id, terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -291,6 +291,7 @@ int32_t sdbWriteWithoutFree(SSdb *pSdb, SSdbRaw *pRaw);
|
|||
* @return void* The object of the row.
|
||||
*/
|
||||
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey);
|
||||
void *sdbAcquireNotReadyObj(SSdb *pSdb, ESdbType type, const void *pKey);
|
||||
|
||||
/**
|
||||
* @brief Release a row from sdb.
|
||||
|
|
|
@ -270,7 +270,7 @@ int32_t sdbWrite(SSdb *pSdb, SSdbRaw *pRaw) {
|
|||
return code;
|
||||
}
|
||||
|
||||
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
|
||||
void *sdbAcquireAll(SSdb *pSdb, ESdbType type, const void *pKey, bool onlyReady) {
|
||||
terrno = 0;
|
||||
|
||||
SHashObj *hash = sdbGetHash(pSdb, type);
|
||||
|
@ -306,10 +306,24 @@ void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) {
|
|||
break;
|
||||
}
|
||||
|
||||
if (pRet == NULL) {
|
||||
if (!onlyReady) {
|
||||
terrno = 0;
|
||||
atomic_add_fetch_32(&pRow->refCount, 1);
|
||||
pRet = pRow->pObj;
|
||||
sdbPrintOper(pSdb, pRow, "acquire");
|
||||
}
|
||||
}
|
||||
|
||||
sdbUnLock(pSdb, type);
|
||||
return pRet;
|
||||
}
|
||||
|
||||
void *sdbAcquire(SSdb *pSdb, ESdbType type, const void *pKey) { return sdbAcquireAll(pSdb, type, pKey, true); }
|
||||
void *sdbAcquireNotReadyObj(SSdb *pSdb, ESdbType type, const void *pKey) {
|
||||
return sdbAcquireAll(pSdb, type, pKey, false);
|
||||
}
|
||||
|
||||
static void sdbCheckRow(SSdb *pSdb, SSdbRow *pRow) {
|
||||
int32_t type = pRow->type;
|
||||
sdbWriteLock(pSdb, type);
|
||||
|
|
|
@ -192,6 +192,8 @@ SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) {
|
|||
return SYNC_TERM_INVALID;
|
||||
}
|
||||
|
||||
static inline bool raftLogForceSync(SSyncRaftEntry* pEntry) { return (pEntry->originalRpcType == TDMT_VND_COMMIT); }
|
||||
|
||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
|
||||
SSyncLogStoreData* pData = pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
|
@ -219,9 +221,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
|
|||
|
||||
ASSERT(pEntry->index == index);
|
||||
|
||||
if (pEntry->originalRpcType == TDMT_VND_COMMIT) {
|
||||
walFsync(pWal, true);
|
||||
}
|
||||
bool forceSync = raftLogForceSync(pEntry);
|
||||
walFsync(pWal, forceSync);
|
||||
|
||||
sNTrace(pData->pSyncNode, "write index:%" PRId64 ", type:%s, origin type:%s, elapsed:%" PRId64, pEntry->index,
|
||||
TMSG_INFO(pEntry->msgType), TMSG_INFO(pEntry->originalRpcType), tsElapsed);
|
||||
|
|
|
@ -1316,11 +1316,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
|
|||
}
|
||||
TDB_CELLDECODER_SET_FREE_KEY(pDecoder);
|
||||
|
||||
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - 4);
|
||||
nLeft -= nLocal - 4;
|
||||
nLeftKey -= nLocal - 4;
|
||||
memcpy(pDecoder->pKey, pCell + nHeader, nLocal - nHeader - sizeof(pgno));
|
||||
nLeft -= nLocal - nHeader - sizeof(pgno);
|
||||
nLeftKey -= nLocal - nHeader - sizeof(pgno);
|
||||
|
||||
memcpy(&pgno, pCell + nHeader + nLocal - 4, sizeof(pgno));
|
||||
memcpy(&pgno, pCell + nLocal - sizeof(pgno), sizeof(pgno));
|
||||
|
||||
int lastKeyPageSpace = 0;
|
||||
// load left key & val to ovpages
|
||||
|
@ -1346,9 +1346,11 @@ static int tdbBtreeDecodePayload(SPage *pPage, const SCell *pCell, int nHeader,
|
|||
|
||||
if (lastKeyPage) {
|
||||
if (lastKeyPageSpace >= vLen) {
|
||||
pDecoder->pVal = ofpCell + kLen - nLeftKey;
|
||||
if (vLen > 0) {
|
||||
pDecoder->pVal = ofpCell + kLen - nLeftKey;
|
||||
|
||||
nLeft -= vLen;
|
||||
nLeft -= vLen;
|
||||
}
|
||||
pgno = 0;
|
||||
} else {
|
||||
// read partial val to local
|
||||
|
|
|
@ -637,11 +637,6 @@ int32_t walWrite(SWal *pWal, int64_t index, tmsg_t msgType, const void *body, in
|
|||
void walFsync(SWal *pWal, bool forceFsync) {
|
||||
taosThreadMutexLock(&pWal->mutex);
|
||||
if (forceFsync || (pWal->cfg.level == TAOS_WAL_FSYNC && pWal->cfg.fsyncPeriod == 0)) {
|
||||
wTrace("vgId:%d, fileId:%" PRId64 ".idx, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||
if (taosFsyncFile(pWal->pIdxFile) < 0) {
|
||||
wError("vgId:%d, file:%" PRId64 ".idx, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
||||
strerror(errno));
|
||||
}
|
||||
wTrace("vgId:%d, fileId:%" PRId64 ".log, do fsync", pWal->cfg.vgId, walGetCurFileFirstVer(pWal));
|
||||
if (taosFsyncFile(pWal->pLogFile) < 0) {
|
||||
wError("vgId:%d, file:%" PRId64 ".log, fsync failed since %s", pWal->cfg.vgId, walGetCurFileFirstVer(pWal),
|
||||
|
|
|
@ -445,6 +445,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/database_pre_suf.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 1-insert/InsertFuturets.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/show.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 0-others/information_schema.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/abs.py -R
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/and_or_for_byte.py
|
||||
|
|
|
@ -0,0 +1,113 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.sql import *
|
||||
from util.common import *
|
||||
from util.sqlset import *
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdLog.debug("start to execute %s" % __file__)
|
||||
tdSql.init(conn.cursor())
|
||||
self.setsql = TDSetSql()
|
||||
self.dbname = 'db'
|
||||
self.stbname = 'stb'
|
||||
self.binary_length = 20 # the length of binary for column_dict
|
||||
self.nchar_length = 20 # the length of nchar for column_dict
|
||||
self.ts = 1537146000000
|
||||
self.column_dict = {
|
||||
'ts' : 'timestamp',
|
||||
'col1': 'tinyint',
|
||||
'col2': 'smallint',
|
||||
'col3': 'int',
|
||||
'col4': 'bigint',
|
||||
'col5': 'tinyint unsigned',
|
||||
'col6': 'smallint unsigned',
|
||||
'col7': 'int unsigned',
|
||||
'col8': 'bigint unsigned',
|
||||
'col9': 'float',
|
||||
'col10': 'double',
|
||||
'col11': 'bool',
|
||||
'col12': f'binary({self.binary_length})',
|
||||
'col13': f'nchar({self.nchar_length})'
|
||||
}
|
||||
self.tbnum = 20
|
||||
self.rowNum = 10
|
||||
self.tag_dict = {
|
||||
't0':'int'
|
||||
}
|
||||
self.tag_values = [
|
||||
f'1'
|
||||
]
|
||||
self.binary_str = 'taosdata'
|
||||
self.nchar_str = '涛思数据'
|
||||
self.ins_list = ['ins_dnodes','ins_mnodes','ins_modules','ins_qnodes','ins_snodes','ins_cluster','ins_databases','ins_functions',\
|
||||
'ins_indexes','ins_stables','ins_tables','ins_tags','ins_users','ins_grants','ins_vgroups','ins_configs','ins_dnode_variables',\
|
||||
'ins_topics','ins_subscriptions','ins_streams','ins_stream_tasks','ins_vnodes','ins_user_privileges']
|
||||
self.perf_list = ['perf_connections','perf_queries','perf_consumers','perf_trans','perf_apps']
|
||||
def insert_data(self,column_dict,tbname,row_num):
|
||||
insert_sql = self.setsql.set_insertsql(column_dict,tbname,self.binary_str,self.nchar_str)
|
||||
for i in range(row_num):
|
||||
insert_list = []
|
||||
self.setsql.insert_values(column_dict,i,insert_sql,insert_list,self.ts)
|
||||
def prepare_data(self):
|
||||
tdSql.execute(f"create database if not exists {self.dbname} vgroups 2")
|
||||
tdSql.execute(f'use {self.dbname}')
|
||||
tdSql.execute(self.setsql.set_create_stable_sql(self.stbname,self.column_dict,self.tag_dict))
|
||||
for i in range(self.tbnum):
|
||||
tdSql.execute(f"create table {self.stbname}_{i} using {self.stbname} tags({self.tag_values[0]})")
|
||||
self.insert_data(self.column_dict,f'{self.stbname}_{i}',self.rowNum)
|
||||
def count_check(self):
|
||||
tdSql.query('select count(*) from information_schema.ins_tables')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum+len(self.ins_list)+len(self.perf_list))
|
||||
tdSql.query(f'select count(*) from information_schema.ins_tables where db_name = "{self.dbname}"')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
|
||||
tdSql.query(f'select count(*) from information_schema.ins_tables where db_name = "{self.dbname}" and stable_name = "{self.stbname}"')
|
||||
tdSql.checkEqual(tdSql.queryResult[0][0],self.tbnum)
|
||||
tdSql.execute('create database db1')
|
||||
tdSql.execute('create table stb1 (ts timestamp,c0 int) tags(t0 int)')
|
||||
tdSql.execute('create table tb1 using stb1 tags(1)')
|
||||
tdSql.query(f'select db_name, stable_name, count(*) from information_schema.ins_tables group by db_name, stable_name')
|
||||
for i in tdSql.queryResult:
|
||||
if i[0].lower() == 'information_schema':
|
||||
tdSql.checkEqual(i[2],len(self.ins_list))
|
||||
elif i[0].lower() == self.dbname and i[1] == self.stbname:
|
||||
tdSql.checkEqual(i[2],self.tbnum)
|
||||
elif i[0].lower() == self.dbname and i[1] == 'stb1':
|
||||
tdSql.checkEqual(i[2],1)
|
||||
elif i[0].lower() == 'performance_schema':
|
||||
tdSql.checkEqual(i[2],len(self.perf_list))
|
||||
tdSql.execute('create table db1.ntb (ts timestamp,c0 int)')
|
||||
tdSql.query(f'select db_name, count(*) from information_schema.ins_tables group by db_name')
|
||||
print(tdSql.queryResult)
|
||||
for i in tdSql.queryResult:
|
||||
if i[0].lower() == 'information_schema':
|
||||
tdSql.checkEqual(i[1],len(self.ins_list))
|
||||
elif i[0].lower() == 'performance_schema':
|
||||
tdSql.checkEqual(i[1],len(self.perf_list))
|
||||
elif i[0].lower() == self.dbname:
|
||||
tdSql.checkEqual(i[1],self.tbnum+1)
|
||||
def run(self):
|
||||
self.prepare_data()
|
||||
self.count_check()
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success("%s successfully executed" % __file__)
|
||||
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
|
@ -112,7 +112,8 @@ class TDTestCase:
|
|||
dnode_first_port = dnode.cfgDict["firstEp"].split(":")[-1]
|
||||
cmd = f" taos -h {dnode_first_host} -P {dnode_first_port} -s ' create dnode \"{dnode_id} \" ' ;"
|
||||
tdLog.debug(cmd)
|
||||
os.system(cmd)
|
||||
if os.system(cmd) != 0:
|
||||
raise Exception("failed to execute system command. cmd: %s" % cmd)
|
||||
|
||||
time.sleep(2)
|
||||
tdLog.info(" create cluster with %d dnode done! " %dnodes_nums)
|
||||
|
@ -292,6 +293,8 @@ class TDTestCase:
|
|||
tdLog.debug("drop mnode %d successfully"%(i+1))
|
||||
break
|
||||
count+=1
|
||||
self.wait_for_transactions(20)
|
||||
|
||||
tdLog.debug("create mnode on dnode %d"%(i+1))
|
||||
tdSql.execute("create mnode on dnode %d"%(i+1))
|
||||
count=0
|
||||
|
@ -299,12 +302,24 @@ class TDTestCase:
|
|||
time.sleep(1)
|
||||
tdSql.query("select * from information_schema.ins_mnodes;")
|
||||
if tdSql.checkRows(3):
|
||||
tdLog.debug("drop mnode %d successfully"%(i+1))
|
||||
tdLog.debug("create mnode %d successfully"%(i+1))
|
||||
break
|
||||
count+=1
|
||||
self.wait_for_transactions(20)
|
||||
dropcount+=1
|
||||
self.check3mnode()
|
||||
|
||||
def wait_for_transactions(self, timeout):
|
||||
count=0
|
||||
while count<timeout:
|
||||
time.sleep(1)
|
||||
tdSql.query("show transactions;")
|
||||
if tdSql.checkRows(0):
|
||||
tdLog.debug("transactions completed successfully")
|
||||
break
|
||||
count+=1
|
||||
if count >= timeout:
|
||||
tdLog.debug("transactions not finished before timeout (%d secs)", timeout)
|
||||
|
||||
def getConnection(self, dnode):
|
||||
host = dnode.cfgDict["fqdn"]
|
||||
|
|
Loading…
Reference in New Issue