Merge branch '3.0' into feature/TD-11274-3.0
This commit is contained in:
commit
26cd9f8a25
|
@ -472,7 +472,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
|
|||
}
|
||||
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_DB, pReq);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
|
||||
if (pTrans == NULL) goto _OVER;
|
||||
|
||||
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
|
||||
|
|
|
@ -1347,13 +1347,11 @@ int32_t mndKillTrans(SMnode *pMnode, STrans *pTrans) {
|
|||
|
||||
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
|
||||
STransAction *pAction = taosArrayGet(pArray, i);
|
||||
if (pAction->errCode != 0) {
|
||||
mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
|
||||
mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
|
||||
pAction->msgSent = 1;
|
||||
pAction->msgReceived = 1;
|
||||
pAction->errCode = 0;
|
||||
}
|
||||
mInfo("trans:%d, %s:%d set processed for kill msg received, errCode from %s to success", pTrans->id,
|
||||
mndTransStr(pAction->stage), i, tstrerror(pAction->errCode));
|
||||
pAction->msgSent = 1;
|
||||
pAction->msgReceived = 1;
|
||||
pAction->errCode = 0;
|
||||
}
|
||||
|
||||
mndTransExecute(pMnode, pTrans);
|
||||
|
|
|
@ -940,7 +940,7 @@ static int32_t mndAddSetVnodeStandByAction(SMnode *pMnode, STrans *pTrans, SDbOb
|
|||
|
||||
action.pCont = pReq;
|
||||
action.contLen = contLen;
|
||||
action.msgType = TDMT_DND_DROP_VNODE;
|
||||
action.msgType = TDMT_SYNC_SET_VNODE_STANDBY;
|
||||
action.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED;
|
||||
|
||||
if (isRedo) {
|
||||
|
|
|
@ -66,7 +66,13 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
vInfo("vgId:%d, replica:%d %s:%u", TD_VID(pVnode), r, pNode->nodeFqdn, pNode->nodePort);
|
||||
}
|
||||
|
||||
return syncReconfig(pVnode->sync, &cfg);
|
||||
SRpcMsg rpcMsg = {.info = pMsg->info};
|
||||
if (syncReconfigBuild(pVnode->sync, &cfg, &rpcMsg) != 0) {
|
||||
vError("vgId:%d, failed to build reconfig msg since %s", TD_VID(pVnode), terrstr());
|
||||
return -1;
|
||||
}
|
||||
|
||||
return syncPropose(pVnode->sync, &rpcMsg, false);
|
||||
}
|
||||
|
||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
|
@ -241,6 +247,30 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
|
|||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||
pFsm->data = pVnode;
|
||||
|
@ -250,6 +280,14 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
|||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||
pFsm->FpRestoreFinishCb = NULL;
|
||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||
|
||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
||||
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
|
||||
pFsm->FpSnapshotStartWrite = vnodeSnapshotStartWrite;
|
||||
pFsm->FpSnapshotStopWrite = vnodeSnapshotStopWrite;
|
||||
pFsm->FpSnapshotDoWrite = vnodeSnapshotDoWrite;
|
||||
|
||||
return pFsm;
|
||||
}
|
||||
|
||||
|
|
|
@ -58,7 +58,7 @@
|
|||
# ---- mnode
|
||||
./test.sh -f tsim/mnode/basic1.sim
|
||||
./test.sh -f tsim/mnode/basic2.sim
|
||||
#./test.sh -f tsim/mnode/basic3.sim
|
||||
./test.sh -f tsim/mnode/basic3.sim
|
||||
./test.sh -f tsim/mnode/basic4.sim
|
||||
./test.sh -f tsim/mnode/basic5.sim
|
||||
|
||||
|
|
|
@ -163,26 +163,22 @@ endi
|
|||
print =============== step32: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower2 dnode 5
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
return
|
||||
|
||||
print =============== step33: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode $follower1 dnode 5
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
|
@ -191,12 +187,9 @@ endi
|
|||
print =============== step34: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
|
@ -205,15 +198,8 @@ endi
|
|||
print =============== step35: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
=======
|
||||
sql show d1.vgroups
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05 $data06 $data07 $data08 $data09
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
|
||||
sql show d1.tables
|
||||
if $rows != 1 then
|
||||
|
@ -242,8 +228,6 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
<<<<<<< HEAD:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_leader.sim
|
||||
=======
|
||||
print =============== step38: move follower2
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
sql redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower2
|
||||
|
@ -254,7 +238,6 @@ sql show d1.tables
|
|||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
>>>>>>> origin/3.0:tests/script/tsim/dnode/redistribute_vgroup_replica3_v1_follower.sim
|
||||
|
||||
print =============== step39: move follower1
|
||||
print redistribute vgroup 2 dnode $leaderVnode dnode 5 dnode $follower1
|
||||
|
|
|
@ -7,44 +7,28 @@ system sh/exec.sh -n dnode1 -s start
|
|||
system sh/exec.sh -n dnode2 -s start
|
||||
sql connect
|
||||
|
||||
print =============== show dnodes
|
||||
sql show dnodes;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql show mnodes;
|
||||
if $rows != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data02 != leader then
|
||||
return -1
|
||||
endi
|
||||
|
||||
print =============== create dnodes
|
||||
sql create dnode $hostname port 7200
|
||||
sleep 2000
|
||||
|
||||
sql show dnodes;
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data00 != 1 then
|
||||
$x = 0
|
||||
step1:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data10 != 2 then
|
||||
return -1
|
||||
if $data(1)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step1
|
||||
endi
|
||||
|
||||
print =============== kill dnode2
|
||||
|
@ -68,7 +52,7 @@ if $data[0][0] != 7 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][2] != undoAction then
|
||||
if $data[0][2] != redoAction then
|
||||
return -1
|
||||
endi
|
||||
|
||||
|
@ -80,14 +64,34 @@ sql_error create database d1 vgroups 2;
|
|||
|
||||
print =============== start dnode2
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
sleep 3000
|
||||
|
||||
$x = 0
|
||||
step2:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step2
|
||||
endi
|
||||
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql create database d1 vgroups 2;
|
||||
sql_error create database d1 vgroups 2;
|
||||
|
||||
print =============== kill dnode2
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
||||
|
@ -106,22 +110,31 @@ if $rows != 1 then
|
|||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][0] != 9 then
|
||||
if $data[0][0] != 8 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][2] != undoAction then
|
||||
if $data[0][2] != redoAction then
|
||||
return -1
|
||||
endi
|
||||
|
||||
if $data[0][3] != d2 then
|
||||
return -1
|
||||
endi
|
||||
return
|
||||
|
||||
sql show databases ;
|
||||
if $rows != 4 then
|
||||
return -1
|
||||
endi
|
||||
print d2 ==> $data(d2)[19]
|
||||
if $data(d2)[19] != creating then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql_error create database d2 vgroups 2;
|
||||
|
||||
print =============== kill transaction
|
||||
sql kill transaction 9;
|
||||
sql kill transaction 8;
|
||||
sleep 2000
|
||||
|
||||
sql show transactions
|
||||
|
@ -131,7 +144,34 @@ endi
|
|||
|
||||
print =============== start dnode2
|
||||
system sh/exec.sh -n dnode2 -s start
|
||||
sleep 3000
|
||||
|
||||
$x = 0
|
||||
step3:
|
||||
$x = $x + 1
|
||||
sleep 1000
|
||||
if $x == 10 then
|
||||
print ====> dnode not ready!
|
||||
return -1
|
||||
endi
|
||||
sql show dnodes
|
||||
print ===> $data00 $data01 $data02 $data03 $data04 $data05
|
||||
print ===> $data10 $data11 $data12 $data13 $data14 $data15
|
||||
if $rows != 2 then
|
||||
return -1
|
||||
endi
|
||||
if $data(1)[4] != ready then
|
||||
goto step3
|
||||
endi
|
||||
if $data(2)[4] != ready then
|
||||
goto step3
|
||||
endi
|
||||
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
return -1
|
||||
endi
|
||||
|
||||
sql drop database d2;
|
||||
|
||||
sql show transactions
|
||||
if $rows != 0 then
|
||||
|
@ -145,6 +185,5 @@ sql_error kill transaction 3;
|
|||
sql_error kill transaction 4;
|
||||
sql_error kill transaction 5;
|
||||
|
||||
return
|
||||
system sh/exec.sh -n dnode1 -s stop -x SIGINT
|
||||
system sh/exec.sh -n dnode2 -s stop -x SIGINT
|
|
@ -12,8 +12,8 @@ from util.dnodes import TDDnode
|
|||
import time
|
||||
import socket
|
||||
import subprocess
|
||||
from multiprocessing import Process
|
||||
import threading as thd
|
||||
|
||||
class MyDnodes(TDDnodes):
|
||||
def __init__(self ,dnodes_lists):
|
||||
super(MyDnodes,self).__init__()
|
||||
|
@ -30,7 +30,6 @@ class TDTestCase:
|
|||
self.depoly_cluster(dnodenumber)
|
||||
self.master_dnode = self.TDDnodes.dnodes[0]
|
||||
self.host=self.master_dnode.cfgDict["fqdn"]
|
||||
|
||||
conn1 = taos.connect(self.master_dnode.cfgDict["fqdn"] , config=self.master_dnode.cfgDir)
|
||||
tdSql.init(conn1.cursor())
|
||||
|
||||
|
@ -50,7 +49,7 @@ class TDTestCase:
|
|||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
|
||||
def insert_data(self,countstart,countstop):
|
||||
# fisrt add data : db\stable\childtable\general table
|
||||
for couti in range(countstart,countstop):
|
||||
|
@ -139,14 +138,15 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode1off(self):
|
||||
tdSql.error("drop mnode on dnode 1;")
|
||||
count=0
|
||||
while count < 10:
|
||||
time.sleep(1)
|
||||
|
@ -167,17 +167,18 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'offline')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode2off(self):
|
||||
tdSql.error("drop mnode on dnode 2;")
|
||||
count=0
|
||||
while count < 10:
|
||||
while count < 40:
|
||||
time.sleep(1)
|
||||
tdSql.query("show mnodes;")
|
||||
if tdSql.checkRows(3) :
|
||||
|
@ -192,17 +193,18 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'leader')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,2,'offline')
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,2,'follower')
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
def check3mnode3off(self):
|
||||
tdSql.error("drop mnode on dnode 3;")
|
||||
count=0
|
||||
while count < 10:
|
||||
time.sleep(1)
|
||||
|
@ -219,13 +221,13 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(3)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'leader')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
tdSql.checkData(1,1,'chenhaoran02:6130')
|
||||
tdSql.checkData(1,1,'%s:6130'%self.host)
|
||||
tdSql.checkData(1,2,'follower')
|
||||
tdSql.checkData(1,3,'ready')
|
||||
tdSql.checkData(2,1,'chenhaoran02:6230')
|
||||
tdSql.checkData(2,1,'%s:6230'%self.host)
|
||||
tdSql.checkData(2,2,'offline')
|
||||
tdSql.checkData(2,3,'ready')
|
||||
|
||||
|
@ -233,13 +235,13 @@ class TDTestCase:
|
|||
|
||||
def five_dnode_three_mnode(self,dnodenumber):
|
||||
tdSql.query("show dnodes;")
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(4,1,'chenhaoran02:6430')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(4,1,'%s:6430'%self.host)
|
||||
tdSql.checkData(0,4,'ready')
|
||||
tdSql.checkData(4,4,'ready')
|
||||
tdSql.query("show mnodes;")
|
||||
tdSql.checkRows(1)
|
||||
tdSql.checkData(0,1,'chenhaoran02:6030')
|
||||
tdSql.checkData(0,1,'%s:6030'%self.host)
|
||||
tdSql.checkData(0,2,'leader')
|
||||
tdSql.checkData(0,3,'ready')
|
||||
|
||||
|
@ -255,15 +257,27 @@ class TDTestCase:
|
|||
|
||||
tdSql.query("show dnodes;")
|
||||
print(tdSql.queryResult)
|
||||
# stop and follower of mnode
|
||||
|
||||
tdLog.debug("stop and follower of mnode")
|
||||
self.TDDnodes.stoptaosd(2)
|
||||
self.check3mnode2off()
|
||||
self.TDDnodes.starttaosd(2)
|
||||
|
||||
self.TDDnodes.stoptaosd(3)
|
||||
self.check3mnode3off()
|
||||
self.TDDnodes.starttaosd(3)
|
||||
|
||||
self.TDDnodes.stoptaosd(1)
|
||||
self.check3mnode1off()
|
||||
self.TDDnodes.starttaosd(1)
|
||||
|
||||
# self.check3mnode()
|
||||
stopcount =0
|
||||
while stopcount <= 2:
|
||||
for i in range(dnodenumber):
|
||||
threads = []
|
||||
threads.append(thd.Thread(target=self.insert_data, args=(i*2,i*2+2)))
|
||||
# start_time = time.time()
|
||||
threads[0].start()
|
||||
# end_time = time.time()
|
||||
self.TDDnodes.stoptaosd(i+1)
|
||||
# if i == 1 :
|
||||
# self.check3mnode2off()
|
||||
|
@ -271,12 +285,15 @@ class TDTestCase:
|
|||
# self.check3mnode3off()
|
||||
# elif i == 0:
|
||||
# self.check3mnode1off()
|
||||
|
||||
self.TDDnodes.starttaosd(i+1)
|
||||
threads[0].join()
|
||||
|
||||
# self.check3mnode()
|
||||
stopcount+=1
|
||||
self.check3mnode()
|
||||
|
||||
|
||||
def getConnection(self, dnode):
|
||||
host = dnode.cfgDict["fqdn"]
|
||||
port = dnode.cfgDict["serverPort"]
|
||||
|
|
|
@ -0,0 +1,315 @@
|
|||
|
||||
import taos
|
||||
import sys
|
||||
import time
|
||||
import socket
|
||||
import os
|
||||
import threading
|
||||
from enum import Enum
|
||||
|
||||
from util.log import *
|
||||
from util.sql import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
|
||||
class actionType(Enum):
|
||||
CREATE_DATABASE = 0
|
||||
CREATE_STABLE = 1
|
||||
CREATE_CTABLE = 2
|
||||
INSERT_DATA = 3
|
||||
|
||||
class TDTestCase:
|
||||
hostname = socket.gethostname()
|
||||
#rpcDebugFlagVal = '143'
|
||||
#clientCfgDict = {'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#clientCfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#updatecfgDict = {'clientCfg': {}, 'serverPort': '', 'firstEp': '', 'secondEp':'', 'rpcDebugFlag':'135', 'fqdn':''}
|
||||
#updatecfgDict["rpcDebugFlag"] = rpcDebugFlagVal
|
||||
#print ("===================: ", updatecfgDict)
|
||||
|
||||
def init(self, conn, logSql):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
tdSql.init(conn.cursor())
|
||||
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def getBuildPath(self):
|
||||
selfPath = os.path.dirname(os.path.realpath(__file__))
|
||||
|
||||
if ("community" in selfPath):
|
||||
projPath = selfPath[:selfPath.find("community")]
|
||||
else:
|
||||
projPath = selfPath[:selfPath.find("tests")]
|
||||
|
||||
for root, dirs, files in os.walk(projPath):
|
||||
if ("taosd" in files or "taosd.exe" in files):
|
||||
rootRealPath = os.path.dirname(os.path.realpath(root))
|
||||
if ("packaging" not in rootRealPath):
|
||||
buildPath = root[:len(root) - len("/build/bin")]
|
||||
break
|
||||
return buildPath
|
||||
|
||||
def newcur(self,cfg,host,port):
|
||||
user = "root"
|
||||
password = "taosdata"
|
||||
con=taos.connect(host=host, user=user, password=password, config=cfg ,port=port)
|
||||
cur=con.cursor()
|
||||
print(cur)
|
||||
return cur
|
||||
|
||||
def initConsumerTable(self,cdbName='cdb'):
|
||||
tdLog.info("create consume database, and consume info table, and consume result table")
|
||||
tdSql.query("create database if not exists %s vgroups 1"%(cdbName))
|
||||
# tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
# tdSql.query("drop table if exists %s.consumeresult "%(cdbName))
|
||||
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
tdSql.query("create table %s.consumeresult (ts timestamp, consumerid int, consummsgcnt bigint, consumrowcnt bigint, checkresult int)"%cdbName)
|
||||
|
||||
def initConsumerInfoTable(self,cdbName='cdb'):
|
||||
tdLog.info("drop consumeinfo table")
|
||||
tdSql.query("drop table if exists %s.consumeinfo "%(cdbName))
|
||||
tdSql.query("create table %s.consumeinfo (ts timestamp, consumerid int, topiclist binary(1024), keylist binary(1024), expectmsgcnt bigint, ifcheckdata int, ifmanualcommit int)"%cdbName)
|
||||
|
||||
def insertConsumerInfo(self,consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifmanualcommit,cdbName='cdb'):
|
||||
sql = "insert into %s.consumeinfo values "%cdbName
|
||||
sql += "(now, %d, '%s', '%s', %d, %d, %d)"%(consumerId, topicList, keyList, expectrowcnt, ifcheckdata, ifmanualcommit)
|
||||
tdLog.info("consume info sql: %s"%sql)
|
||||
tdSql.query(sql)
|
||||
|
||||
def selectConsumeResult(self,expectRows,cdbName='cdb'):
|
||||
resultList=[]
|
||||
while 1:
|
||||
tdSql.query("select * from %s.consumeresult"%cdbName)
|
||||
#tdLog.info("row: %d, %l64d, %l64d"%(tdSql.getData(0, 1),tdSql.getData(0, 2),tdSql.getData(0, 3))
|
||||
if tdSql.getRows() == expectRows:
|
||||
break
|
||||
else:
|
||||
time.sleep(5)
|
||||
|
||||
for i in range(expectRows):
|
||||
tdLog.info ("consume id: %d, consume msgs: %d, consume rows: %d"%(tdSql.getData(i , 1), tdSql.getData(i , 2), tdSql.getData(i , 3)))
|
||||
resultList.append(tdSql.getData(i , 3))
|
||||
|
||||
return resultList
|
||||
|
||||
def startTmqSimProcess(self,buildPath,cfgPath,pollDelay,dbName,showMsg=1,showRow=1,cdbName='cdb',valgrind=0):
|
||||
if valgrind == 1:
|
||||
logFile = cfgPath + '/../log/valgrind-tmq.log'
|
||||
shellCmd = 'nohup valgrind --log-file=' + logFile
|
||||
shellCmd += '--tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all --num-callers=20 -v --workaround-gcc296-bugs=yes '
|
||||
|
||||
if (platform.system().lower() == 'windows'):
|
||||
shellCmd = 'mintty -h never -w hide ' + buildPath + '\\build\\bin\\tmq_sim.exe -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> nul 2>&1 &"
|
||||
else:
|
||||
shellCmd = 'nohup ' + buildPath + '/build/bin/tmq_sim -c ' + cfgPath
|
||||
shellCmd += " -y %d -d %s -g %d -r %d -w %s "%(pollDelay, dbName, showMsg, showRow, cdbName)
|
||||
shellCmd += "> /dev/null 2>&1 &"
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
|
||||
def create_database(self,tsql, dbName,dropFlag=1,vgroups=4,replica=1):
|
||||
if dropFlag == 1:
|
||||
tsql.execute("drop database if exists %s"%(dbName))
|
||||
|
||||
tsql.execute("create database if not exists %s vgroups %d replica %d"%(dbName, vgroups, replica))
|
||||
tdLog.debug("complete to create database %s"%(dbName))
|
||||
return
|
||||
|
||||
def create_stable(self,tsql, dbName,stbName):
|
||||
tsql.execute("create table if not exists %s.%s (ts timestamp, c1 bigint, c2 binary(16)) tags(t1 int)"%(dbName, stbName))
|
||||
tdLog.debug("complete to create %s.%s" %(dbName, stbName))
|
||||
return
|
||||
|
||||
def create_ctables(self,tsql, dbName,stbName,ctbNum):
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_create = "create table"
|
||||
sql = pre_create
|
||||
#tdLog.debug("doing create one stable %s and %d child table in %s ..." %(stbname, count ,dbname))
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d using %s tags(%d)"%(stbName,i,stbName,i+1)
|
||||
if (i > 0) and (i%100 == 0):
|
||||
tsql.execute(sql)
|
||||
sql = pre_create
|
||||
if sql != pre_create:
|
||||
tsql.execute(sql)
|
||||
|
||||
tdLog.debug("complete to create %d child tables in %s.%s" %(ctbNum, dbName, stbName))
|
||||
return
|
||||
|
||||
def insert_data(self,tsql,dbName,stbName,ctbNum,rowsPerTbl,batchNum,startTs=0):
|
||||
tdLog.debug("start to insert data ............")
|
||||
tsql.execute("use %s" %dbName)
|
||||
pre_insert = "insert into "
|
||||
sql = pre_insert
|
||||
|
||||
if startTs == 0:
|
||||
t = time.time()
|
||||
startTs = int(round(t * 1000))
|
||||
|
||||
#tdLog.debug("doing insert data into stable:%s rows:%d ..."%(stbName, allRows))
|
||||
rowsOfSql = 0
|
||||
for i in range(ctbNum):
|
||||
sql += " %s_%d values "%(stbName,i)
|
||||
for j in range(rowsPerTbl):
|
||||
sql += "(%d, %d, 'tmqrow_%d') "%(startTs + j, j, j)
|
||||
rowsOfSql += 1
|
||||
if (j > 0) and ((rowsOfSql == batchNum) or (j == rowsPerTbl - 1)):
|
||||
tsql.execute(sql)
|
||||
rowsOfSql = 0
|
||||
if j < rowsPerTbl - 1:
|
||||
sql = "insert into %s_%d values " %(stbName,i)
|
||||
else:
|
||||
sql = "insert into "
|
||||
#end sql
|
||||
if sql != pre_insert:
|
||||
#print("insert sql:%s"%sql)
|
||||
tsql.execute(sql)
|
||||
tdLog.debug("insert data ............ [OK]")
|
||||
return
|
||||
|
||||
def prepareEnv(self, **parameterDict):
|
||||
# create new connector for my thread
|
||||
tsql=self.newcur(parameterDict['cfg'], 'localhost', 6030)
|
||||
|
||||
if parameterDict["actionType"] == actionType.CREATE_DATABASE:
|
||||
self.create_database(tsql, parameterDict["dbName"])
|
||||
elif parameterDict["actionType"] == actionType.CREATE_STABLE:
|
||||
self.create_stable(tsql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
elif parameterDict["actionType"] == actionType.CREATE_CTABLE:
|
||||
self.create_ctables(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
elif parameterDict["actionType"] == actionType.INSERT_DATA:
|
||||
self.insert_data(tsql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"],\
|
||||
parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||
else:
|
||||
tdLog.exit("not support's action: ", parameterDict["actionType"])
|
||||
|
||||
return
|
||||
|
||||
def tmqCase1(self, cfgPath, buildPath):
|
||||
'''
|
||||
Leave a TMQ process. Stop taosd, delete the data directory, restart taosd,
|
||||
and restart a consumption process to complete a consumption
|
||||
'''
|
||||
tdLog.printNoPrefix("======== test case 1: ")
|
||||
|
||||
self.initConsumerTable()
|
||||
|
||||
# create and start thread
|
||||
parameterDict = {'cfg': '', \
|
||||
'actionType': 0, \
|
||||
'dbName': 'db3', \
|
||||
'dropFlag': 1, \
|
||||
'vgroups': 4, \
|
||||
'replica': 1, \
|
||||
'stbName': 'stb1', \
|
||||
'ctbNum': 10, \
|
||||
'rowsPerTbl': 20000, \
|
||||
'batchNum': 100, \
|
||||
'startTs': 1640966400000} # 2022-01-01 00:00:00.000
|
||||
parameterDict['cfg'] = cfgPath
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
# expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
expectrowcnt = 90000000000
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 9000000 # Forever loop
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
time.sleep(3)
|
||||
tdLog.info("================= stop dnode, and remove data file, then start dnode ===========================")
|
||||
tdDnodes.stop(1)
|
||||
# time.sleep(5)
|
||||
dataPath = buildPath + "/../sim/dnode1/data/*"
|
||||
shellCmd = 'rm -rf ' + dataPath
|
||||
tdLog.info(shellCmd)
|
||||
os.system(shellCmd)
|
||||
tdDnodes.start(1)
|
||||
time.sleep(2)
|
||||
|
||||
######### redo to consume
|
||||
self.initConsumerTable()
|
||||
|
||||
self.create_database(tdSql, parameterDict["dbName"])
|
||||
self.create_stable(tdSql, parameterDict["dbName"], parameterDict["stbName"])
|
||||
self.create_ctables(tdSql, parameterDict["dbName"], parameterDict["stbName"], parameterDict["ctbNum"])
|
||||
self.insert_data(tdSql,parameterDict["dbName"],parameterDict["stbName"],parameterDict["ctbNum"],parameterDict["rowsPerTbl"],parameterDict["batchNum"])
|
||||
|
||||
tdLog.info("create topics from stb1")
|
||||
topicFromStb1 = 'topic_stb1'
|
||||
|
||||
tdSql.execute("create topic %s as select ts, c1, c2 from %s.%s" %(topicFromStb1, parameterDict['dbName'], parameterDict['stbName']))
|
||||
consumerId = 0
|
||||
expectrowcnt = parameterDict["rowsPerTbl"] * parameterDict["ctbNum"]
|
||||
topicList = topicFromStb1
|
||||
ifcheckdata = 0
|
||||
ifManualCommit = 0
|
||||
keyList = 'group.id:cgrp1,\
|
||||
enable.auto.commit:false,\
|
||||
auto.commit.interval.ms:6000,\
|
||||
auto.offset.reset:earliest'
|
||||
self.insertConsumerInfo(consumerId, expectrowcnt,topicList,keyList,ifcheckdata,ifManualCommit)
|
||||
|
||||
tdLog.info("start consume processor")
|
||||
pollDelay = 20
|
||||
showMsg = 1
|
||||
showRow = 1
|
||||
self.startTmqSimProcess(buildPath,cfgPath,pollDelay,parameterDict["dbName"],showMsg, showRow)
|
||||
|
||||
expectRows = 1
|
||||
resultList = self.selectConsumeResult(expectRows)
|
||||
totalConsumeRows = 0
|
||||
for i in range(expectRows):
|
||||
totalConsumeRows += resultList[i]
|
||||
|
||||
tdLog.info("act consume rows: %d, expect consume rows: %d"%(totalConsumeRows, expectrowcnt))
|
||||
if not (totalConsumeRows == expectrowcnt):
|
||||
tdLog.exit("tmq consume rows error!")
|
||||
|
||||
tdSql.query("drop topic %s"%topicFromStb1)
|
||||
os.system('pkill tmq_sim')
|
||||
|
||||
tdLog.printNoPrefix("======== test case 1 end ...... ")
|
||||
|
||||
def run(self):
|
||||
tdSql.prepare()
|
||||
|
||||
buildPath = self.getBuildPath()
|
||||
if (buildPath == ""):
|
||||
tdLog.exit("taosd not found!")
|
||||
else:
|
||||
tdLog.info("taosd found in %s" % buildPath)
|
||||
cfgPath = buildPath + "/../sim/psim/cfg"
|
||||
tdLog.info("cfgPath: %s" % cfgPath)
|
||||
|
||||
self.tmqCase1(cfgPath, buildPath)
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
event = threading.Event()
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
|
@ -116,3 +116,4 @@ python3 ./test.py -f 7-tmq/subscribeStb2.py
|
|||
python3 ./test.py -f 7-tmq/subscribeStb3.py
|
||||
python3 ./test.py -f 7-tmq/subscribeStb4.py
|
||||
python3 ./test.py -f 7-tmq/db.py
|
||||
python3 ./test.py -f 7-tmq/tmqError.py
|
||||
|
|
Loading…
Reference in New Issue