Merge pull request #28580 from taosdata/fix/TS-5251-add-conflict-check
fix/TS-5251-add-conflict-check
This commit is contained in:
commit
fb51956f0a
|
@ -1239,6 +1239,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
|
|||
|
||||
mndTransSetDbName(pTrans, pOld->name, NULL);
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
|
|
|
@ -867,6 +867,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|||
if (pIter == NULL) break;
|
||||
|
||||
if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
|
||||
if (pNew->conflict == TRN_CONFLICT_DB) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
|
@ -874,6 +875,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|||
mndTransLogConflict(pNew, pTrans, mndCheckStbConflict(pNew->stbname, pTrans), &conflict);
|
||||
}
|
||||
}
|
||||
|
||||
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB) {
|
||||
|
@ -885,22 +887,6 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
|
|||
}
|
||||
}
|
||||
|
||||
// if (pNew->conflict == TRN_CONFLICT_TOPIC) {
|
||||
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
// }
|
||||
// }
|
||||
// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
// if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
|
||||
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
|
||||
// }
|
||||
// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
|
||||
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
|
||||
// conflict = true;
|
||||
// }
|
||||
// }
|
||||
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
|
||||
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
|
||||
|
@ -963,8 +949,10 @@ int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) {
|
|||
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
|
||||
if (pIter == NULL) break;
|
||||
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL || pTrans->conflict == TRN_CONFLICT_DB ||
|
||||
pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) {
|
||||
thisConflict = true;
|
||||
}
|
||||
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
|
||||
if (strcasecmp(pTrans->dbname, pCompact->dbname) == 0) thisConflict = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -3193,6 +3193,7 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
|
|||
mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
|
||||
|
||||
mndTransSetDbName(pTrans, pDb->name, NULL);
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
SVgObj newVg1 = {0};
|
||||
memcpy(&newVg1, pVgroup, sizeof(SVgObj));
|
||||
|
@ -3441,6 +3442,8 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
|
|||
}
|
||||
mndTransSetSerial(pTrans);
|
||||
mInfo("trans:%d, used to balance vgroup", pTrans->id);
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
|
||||
|
||||
while (1) {
|
||||
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
|
||||
|
|
|
@ -97,6 +97,11 @@ class TDTestCase(TBase):
|
|||
self.alterReplica(1)
|
||||
self.checkAggCorrect()
|
||||
self.compactDb()
|
||||
|
||||
if self.waitCompactsZero() is False:
|
||||
tdLog.exit(f"compact not finished")
|
||||
return False
|
||||
|
||||
self.alterReplica3()
|
||||
|
||||
vgids = self.getVGroup(self.db)
|
||||
|
|
|
@ -283,6 +283,18 @@ class TBase:
|
|||
time.sleep(interval)
|
||||
|
||||
return False
|
||||
def waitCompactsZero(self, seconds = 300, interval = 1):
|
||||
# wait end
|
||||
for i in range(seconds):
|
||||
sql ="show compacts;"
|
||||
rows = tdSql.query(sql)
|
||||
if rows == 0:
|
||||
tdLog.info("compacts count became zero.")
|
||||
return True
|
||||
#tdLog.info(f"i={i} wait ...")
|
||||
time.sleep(interval)
|
||||
|
||||
return False
|
||||
|
||||
# check file exist
|
||||
def checkFileExist(self, pathFile):
|
||||
|
|
|
@ -753,6 +753,7 @@
|
|||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py -N 4 -M 1
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/compactDBConflict.py -N 3
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 2
|
||||
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -Q 2
|
||||
|
|
|
@ -0,0 +1,204 @@
|
|||
###################################################################
|
||||
# Copyright (c) 2016 by TAOS Technologies, Inc.
|
||||
# All rights reserved.
|
||||
#
|
||||
# This file is proprietary and confidential to TAOS Technologies.
|
||||
# No part of this file may be reproduced, stored, transmitted,
|
||||
# disclosed or used in any form or by any means other than as
|
||||
# expressly provided by the written permission from Jianhui Tao
|
||||
#
|
||||
###################################################################
|
||||
|
||||
# -*- coding: utf-8 -*-
|
||||
from util.log import *
|
||||
from util.cases import *
|
||||
from util.dnodes import *
|
||||
from util.sql import *
|
||||
from util.common import tdCom
|
||||
import threading
|
||||
|
||||
|
||||
class TDTestCase:
|
||||
def init(self, conn, logSql, replicaVar=1):
|
||||
tdLog.debug(f"start to init {__file__}")
|
||||
self.replicaVar = int(replicaVar)
|
||||
tdSql.init(conn.cursor(), logSql) # output sql.txt file
|
||||
|
||||
def run(self):
|
||||
tdLog.debug(f"start to excute {__file__}")
|
||||
|
||||
tdSql.execute('CREATE DATABASE db vgroups 4 replica 1;')
|
||||
|
||||
tdSql.execute('use db;')
|
||||
|
||||
tdLog.debug("start test1")
|
||||
event = threading.Event()
|
||||
newTdSql=tdCom.newTdSql()
|
||||
t0 = threading.Thread(target=self.compactDBThread, args=('', event, newTdSql))
|
||||
t0.start()
|
||||
tdLog.info("t0 threading started,wait compact db tran start")
|
||||
event.wait()
|
||||
tdSql.error('ALTER DATABASE db REPLICA 3;', expectErrInfo="Transaction not completed due to conflict with compact")
|
||||
tdLog.info("wait compact db finish")
|
||||
t0.join()
|
||||
|
||||
tdLog.debug("start test2")
|
||||
event1 = threading.Event()
|
||||
newTdSql1=tdCom.newTdSql()
|
||||
t1 = threading.Thread(target=self.compactDBThread, args=('', event1, newTdSql1))
|
||||
t1.start()
|
||||
tdLog.info("t1 threading started,wait compact db tran start")
|
||||
event1.wait()
|
||||
tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact")
|
||||
tdLog.info("wait compact db finish")
|
||||
t1.join()
|
||||
|
||||
tdLog.debug("start test3")
|
||||
event2 = threading.Event()
|
||||
newTdSql2=tdCom.newTdSql()
|
||||
t2 = threading.Thread(target=self.compactDBThread, args=('', event2, newTdSql2))
|
||||
t2.start()
|
||||
tdLog.info("t2 threading started,wait compact db tran start")
|
||||
event2.wait()
|
||||
rowLen = tdSql.query('show vgroups')
|
||||
if rowLen > 0:
|
||||
vgroupId = tdSql.getData(0, 0)
|
||||
tdLog.info(f"split Vgroup vgroupId:{vgroupId} start")
|
||||
tdSql.error(f"split vgroup {vgroupId}", expectErrInfo="Transaction not completed due to conflict with compact")
|
||||
else:
|
||||
tdLog.exit("get vgroupId fail!")
|
||||
tdLog.info("wait compact db finish")
|
||||
t2.join()
|
||||
|
||||
tdLog.debug("start test4")
|
||||
event3 = threading.Event()
|
||||
newTdSql3=tdCom.newTdSql()
|
||||
t3 = threading.Thread(target=self.compactDBThread, args=('', event3, newTdSql3))
|
||||
t3.start()
|
||||
tdLog.info("t3 threading started!!!!!")
|
||||
event3.wait()
|
||||
tdSql.error('BALANCE VGROUP;', expectErrInfo="Transaction not completed due to conflict with compact")
|
||||
t3.join()
|
||||
|
||||
tdLog.debug("start test5")
|
||||
newTdSql4=tdCom.newTdSql()
|
||||
t4 = threading.Thread(target=self.splitVgroupThread, args=('', newTdSql4))
|
||||
t4.start()
|
||||
tdLog.info("t4 threading started!!!!!")
|
||||
time.sleep(1)
|
||||
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
|
||||
t4.join()
|
||||
|
||||
tdLog.debug("start test6")
|
||||
newTdSql5=tdCom.newTdSql()
|
||||
t5 = threading.Thread(target=self.RedistributeVGroups, args=('', newTdSql5))
|
||||
t5.start()
|
||||
tdLog.info("t5 threading started!!!!!")
|
||||
time.sleep(1)
|
||||
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
|
||||
t5.join()
|
||||
|
||||
tdLog.debug("start test7")
|
||||
newTdSql6=tdCom.newTdSql()
|
||||
t6 = threading.Thread(target=self.balanceVGROUPThread, args=('', newTdSql6))
|
||||
t6.start()
|
||||
tdLog.info("t6 threading started!!!!!")
|
||||
time.sleep(1)
|
||||
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
|
||||
t6.join()
|
||||
|
||||
tdLog.debug("start test8")
|
||||
newTdSql7=tdCom.newTdSql()
|
||||
t7 = threading.Thread(target=self.alterDBThread, args=('', newTdSql7))
|
||||
t7.start()
|
||||
tdLog.info("t7 threading started!!!!!")
|
||||
time.sleep(1)
|
||||
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
|
||||
t7.join()
|
||||
|
||||
|
||||
def compactDBThread(self, p, event, newtdSql):
|
||||
tdLog.info("compact db start")
|
||||
newtdSql.execute('compact DATABASE db')
|
||||
event.set()
|
||||
if self.waitCompactsZero(atdSql=newtdSql) is False:
|
||||
tdLog.info(f"compact not finished")
|
||||
|
||||
def alterDBThread(self, p, newtdSql):
|
||||
tdLog.info("alter db start")
|
||||
newtdSql.execute('ALTER DATABASE db REPLICA 3')
|
||||
if self.waitTransactionZero(atdSql=newtdSql) is False:
|
||||
tdLog.info(f"transaction not finished")
|
||||
|
||||
def balanceVGROUPThread(self, p, newtdSql):
|
||||
tdLog.info("balance VGROUP start")
|
||||
newtdSql.execute('BALANCE VGROUP')
|
||||
if self.waitTransactionZero(atdSql=newtdSql) is False:
|
||||
tdLog.info(f"transaction not finished")
|
||||
|
||||
def RedistributeVGroups(self, p, newtdSql):
|
||||
tdLog.info("REDISTRIBUTE VGROUP start")
|
||||
sql = f"REDISTRIBUTE VGROUP 5 DNODE 1"
|
||||
newtdSql.execute(sql, show=True)
|
||||
if self.waitTransactionZero(atdSql=newtdSql) is False:
|
||||
tdLog.exit(f"{sql} transaction not finished")
|
||||
return False
|
||||
|
||||
sql = f"REDISTRIBUTE VGROUP 4 DNODE 1"
|
||||
newtdSql.execute(sql, show=True)
|
||||
if self.waitTransactionZero(atdSql=newtdSql) is False:
|
||||
tdLog.exit(f"{sql} transaction not finished")
|
||||
return False
|
||||
|
||||
sql = f"REDISTRIBUTE VGROUP 3 DNODE 1"
|
||||
newtdSql.execute(sql, show=True)
|
||||
if self.waitTransactionZero(atdSql=newtdSql) is False:
|
||||
tdLog.exit(f"{sql} transaction not finished")
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def splitVgroupThread(self, p, newtdSql):
|
||||
newtdSql.execute(f"use db;")
|
||||
rowLen = newtdSql.query('show vgroups')
|
||||
if rowLen > 0:
|
||||
vgroupId = newtdSql.getData(0, 0)
|
||||
tdLog.info(f"splitVgroupThread vgroupId:{vgroupId} start")
|
||||
newtdSql.execute(f"split vgroup {vgroupId}")
|
||||
else:
|
||||
tdLog.exit("get vgroupId fail!")
|
||||
if self.waitTransactionZero(atdSql=newtdSql) is False:
|
||||
tdLog.info(f"transaction not finished")
|
||||
|
||||
def waitTransactionZero(self, atdSql, seconds = 300, interval = 1):
|
||||
# wait end
|
||||
for i in range(seconds):
|
||||
sql ="show transactions;"
|
||||
rows = atdSql.query(sql)
|
||||
if rows == 0:
|
||||
tdLog.info("transaction count became zero.")
|
||||
return True
|
||||
#tdLog.info(f"i={i} wait ...")
|
||||
time.sleep(interval)
|
||||
|
||||
return False
|
||||
def waitCompactsZero(self, atdSql, seconds = 300, interval = 1):
|
||||
# wait end
|
||||
for i in range(seconds):
|
||||
sql ="show compacts;"
|
||||
rows = atdSql.query(sql)
|
||||
if rows == 0:
|
||||
tdLog.info("compacts count became zero.")
|
||||
return True
|
||||
#tdLog.info(f"i={i} wait ...")
|
||||
time.sleep(interval)
|
||||
|
||||
return False
|
||||
|
||||
def stop(self):
|
||||
tdSql.close()
|
||||
tdLog.success(f"{__file__} successfully executed")
|
||||
|
||||
|
||||
tdCases.addLinux(__file__, TDTestCase())
|
||||
tdCases.addWindows(__file__, TDTestCase())
|
Loading…
Reference in New Issue