Merge branch '3.0' into merge/mainto3.0

This commit is contained in:
Shengliang Guan 2024-11-28 19:22:17 +08:00
commit a3f077ce99
9 changed files with 277 additions and 76 deletions

View File

@ -1239,6 +1239,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
mndTransSetDbName(pTrans, pOld->name, NULL);
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);

View File

@ -867,6 +867,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
if (pIter == NULL) break;
if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pNew->conflict == TRN_CONFLICT_DB) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
@ -874,6 +875,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
mndTransLogConflict(pNew, pTrans, mndCheckStbConflict(pNew->stbname, pTrans), &conflict);
}
}
if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_DB) {
@ -885,22 +887,6 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
}
}
// if (pNew->conflict == TRN_CONFLICT_TOPIC) {
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
// }
// }
// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
// if (pTrans->conflict == TRN_CONFLICT_TOPIC) {
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true;
// }
// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) {
// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0)
// conflict = true;
// }
// }
if (pNew->conflict == TRN_CONFLICT_ARBGROUP) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true;
if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) {
@ -963,8 +949,10 @@ int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) {
pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact);
if (pIter == NULL) break;
if (pTrans->conflict == TRN_CONFLICT_GLOBAL || pTrans->conflict == TRN_CONFLICT_DB ||
pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (pTrans->conflict == TRN_CONFLICT_GLOBAL) {
thisConflict = true;
}
if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) {
if (strcasecmp(pTrans->dbname, pCompact->dbname) == 0) thisConflict = true;
}

View File

@ -3193,6 +3193,7 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId);
mndTransSetDbName(pTrans, pDb->name, NULL);
TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
SVgObj newVg1 = {0};
memcpy(&newVg1, pVgroup, sizeof(SVgObj));
@ -3441,6 +3442,8 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) {
}
mndTransSetSerial(pTrans);
mInfo("trans:%d, used to balance vgroup", pTrans->id);
TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER);
TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER);
while (1) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);

View File

@ -371,7 +371,8 @@ static int32_t walLogEntriesComplete(const SWal* pWal) {
}
static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
TdFilePtr pFile = NULL;
SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx);
if (!pFileInfo) {
TAOS_RETURN(TSDB_CODE_FAILED);
@ -384,7 +385,7 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) {
wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
code = terrno;
TAOS_RETURN(code);
goto _exit;
}
int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1);
int64_t lastEndOffset = records * sizeof(SWalIdxEntry);
@ -393,9 +394,10 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE);
if (pFile == NULL) {
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize,
@ -404,11 +406,12 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) {
code = taosFtruncateFile(pFile, lastEndOffset);
if (code < 0) {
wError("vgId:%d, failed to truncate file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr);
TAOS_RETURN(code);
goto _exit;
}
(void)taosCloseFile(&pFile);
TAOS_RETURN(TSDB_CODE_SUCCESS);
_exit:
(void)taosCloseFile(&pFile);
TAOS_RETURN(code);
}
static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) {

View File

@ -160,12 +160,13 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) {
int32_t walRollback(SWal *pWal, int64_t ver) {
TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex));
wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver);
int32_t code = 0;
int64_t ret;
char fnameStr[WAL_FILE_LEN];
TdFilePtr pIdxFile = NULL, pLogFile = NULL;
if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER);
code = TSDB_CODE_WAL_INVALID_VER;
goto _exit;
}
// find correct file
@ -173,9 +174,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
// change current files
ret = walChangeWrite(pWal, ver);
if (ret < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
// delete files in descending order
@ -197,98 +197,81 @@ int32_t walRollback(SWal *pWal, int64_t ver) {
}
walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile));
TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
if (pIdxFile == NULL) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
int64_t idxOff = walGetVerIdxOffset(pWal, ver);
ret = taosLSeekFile(pIdxFile, idxOff, SEEK_SET);
if (ret < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
// read idx file and get log file pos
SWalIdxEntry entry;
if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr);
TAOS_UNUSED(taosCloseFile(&pWal->pLogFile));
TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND);
wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr);
if (pLogFile == NULL) {
// TODO
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
ret = taosLSeekFile(pLogFile, entry.offset, SEEK_SET);
if (ret < 0) {
// TODO
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
// validate offset
SWalCkHead head;
int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead));
if (size != sizeof(SWalCkHead)) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(terrno);
code = terrno;
goto _exit;
}
int32_t code = walValidHeadCksum(&head);
code = walValidHeadCksum(&head);
if (code != 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
code = TSDB_CODE_WAL_FILE_CORRUPTED;
goto _exit;
}
if (head.head.version != ver) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED);
code = TSDB_CODE_WAL_FILE_CORRUPTED;
goto _exit;
}
// truncate old files
code = taosFtruncateFile(pLogFile, entry.offset);
if (code < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
goto _exit;
}
code = taosFtruncateFile(pIdxFile, idxOff);
if (code < 0) {
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
goto _exit;
}
pWal->vers.lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1;
((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset;
TAOS_UNUSED(taosCloseFile(&pIdxFile));
TAOS_UNUSED(taosCloseFile(&pLogFile));
code = walSaveMeta(pWal);
if (code < 0) {
wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr());
goto _exit;
}
_exit:
TAOS_UNUSED(taosCloseFile(&pIdxFile));
TAOS_UNUSED(taosCloseFile(&pLogFile));
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(code);
}
// unlock
TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex));
TAOS_RETURN(TSDB_CODE_SUCCESS);
}
static int32_t walRollImpl(SWal *pWal) {
@ -718,6 +701,7 @@ static int32_t walInitWriteFile(SWal *pWal) {
walBuildLogName(pWal, fileFirstVer, fnameStr);
pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND);
if (pLogTFile == NULL) {
TAOS_UNUSED(taosCloseFile(&pIdxTFile));
TAOS_RETURN(terrno);
}
// switch file

View File

@ -97,6 +97,11 @@ class TDTestCase(TBase):
self.alterReplica(1)
self.checkAggCorrect()
self.compactDb()
if self.waitCompactsZero() is False:
tdLog.exit(f"compact not finished")
return False
self.alterReplica3()
vgids = self.getVGroup(self.db)

View File

@ -283,6 +283,18 @@ class TBase:
time.sleep(interval)
return False
def waitCompactsZero(self, seconds = 300, interval = 1):
# wait end
for i in range(seconds):
sql ="show compacts;"
rows = tdSql.query(sql)
if rows == 0:
tdLog.info("compacts count became zero.")
return True
#tdLog.info(f"i={i} wait ...")
time.sleep(interval)
return False
# check file exist
def checkFileExist(self, pathFile):

View File

@ -755,6 +755,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py -N 4 -M 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1
,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/compactDBConflict.py -N 3
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 2
,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -Q 2

View File

@ -0,0 +1,204 @@
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
from util.log import *
from util.cases import *
from util.dnodes import *
from util.sql import *
from util.common import tdCom
import threading
class TDTestCase:
def init(self, conn, logSql, replicaVar=1):
tdLog.debug(f"start to init {__file__}")
self.replicaVar = int(replicaVar)
tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
tdLog.debug(f"start to excute {__file__}")
tdSql.execute('CREATE DATABASE db vgroups 4 replica 1;')
tdSql.execute('use db;')
tdLog.debug("start test1")
event = threading.Event()
newTdSql=tdCom.newTdSql()
t0 = threading.Thread(target=self.compactDBThread, args=('', event, newTdSql))
t0.start()
tdLog.info("t0 threading started,wait compact db tran start")
event.wait()
tdSql.error('ALTER DATABASE db REPLICA 3;', expectErrInfo="Transaction not completed due to conflict with compact")
tdLog.info("wait compact db finish")
t0.join()
tdLog.debug("start test2")
event1 = threading.Event()
newTdSql1=tdCom.newTdSql()
t1 = threading.Thread(target=self.compactDBThread, args=('', event1, newTdSql1))
t1.start()
tdLog.info("t1 threading started,wait compact db tran start")
event1.wait()
tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact")
tdLog.info("wait compact db finish")
t1.join()
tdLog.debug("start test3")
event2 = threading.Event()
newTdSql2=tdCom.newTdSql()
t2 = threading.Thread(target=self.compactDBThread, args=('', event2, newTdSql2))
t2.start()
tdLog.info("t2 threading started,wait compact db tran start")
event2.wait()
rowLen = tdSql.query('show vgroups')
if rowLen > 0:
vgroupId = tdSql.getData(0, 0)
tdLog.info(f"split Vgroup vgroupId:{vgroupId} start")
tdSql.error(f"split vgroup {vgroupId}", expectErrInfo="Transaction not completed due to conflict with compact")
else:
tdLog.exit("get vgroupId fail!")
tdLog.info("wait compact db finish")
t2.join()
tdLog.debug("start test4")
event3 = threading.Event()
newTdSql3=tdCom.newTdSql()
t3 = threading.Thread(target=self.compactDBThread, args=('', event3, newTdSql3))
t3.start()
tdLog.info("t3 threading started!!!!!")
event3.wait()
tdSql.error('BALANCE VGROUP;', expectErrInfo="Transaction not completed due to conflict with compact")
t3.join()
tdLog.debug("start test5")
newTdSql4=tdCom.newTdSql()
t4 = threading.Thread(target=self.splitVgroupThread, args=('', newTdSql4))
t4.start()
tdLog.info("t4 threading started!!!!!")
time.sleep(1)
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
t4.join()
tdLog.debug("start test6")
newTdSql5=tdCom.newTdSql()
t5 = threading.Thread(target=self.RedistributeVGroups, args=('', newTdSql5))
t5.start()
tdLog.info("t5 threading started!!!!!")
time.sleep(1)
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
t5.join()
tdLog.debug("start test7")
newTdSql6=tdCom.newTdSql()
t6 = threading.Thread(target=self.balanceVGROUPThread, args=('', newTdSql6))
t6.start()
tdLog.info("t6 threading started!!!!!")
time.sleep(1)
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
t6.join()
tdLog.debug("start test8")
newTdSql7=tdCom.newTdSql()
t7 = threading.Thread(target=self.alterDBThread, args=('', newTdSql7))
t7.start()
tdLog.info("t7 threading started!!!!!")
time.sleep(1)
tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed")
t7.join()
def compactDBThread(self, p, event, newtdSql):
tdLog.info("compact db start")
newtdSql.execute('compact DATABASE db')
event.set()
if self.waitCompactsZero(atdSql=newtdSql) is False:
tdLog.info(f"compact not finished")
def alterDBThread(self, p, newtdSql):
tdLog.info("alter db start")
newtdSql.execute('ALTER DATABASE db REPLICA 3')
if self.waitTransactionZero(atdSql=newtdSql) is False:
tdLog.info(f"transaction not finished")
def balanceVGROUPThread(self, p, newtdSql):
tdLog.info("balance VGROUP start")
newtdSql.execute('BALANCE VGROUP')
if self.waitTransactionZero(atdSql=newtdSql) is False:
tdLog.info(f"transaction not finished")
def RedistributeVGroups(self, p, newtdSql):
tdLog.info("REDISTRIBUTE VGROUP start")
sql = f"REDISTRIBUTE VGROUP 5 DNODE 1"
newtdSql.execute(sql, show=True)
if self.waitTransactionZero(atdSql=newtdSql) is False:
tdLog.exit(f"{sql} transaction not finished")
return False
sql = f"REDISTRIBUTE VGROUP 4 DNODE 1"
newtdSql.execute(sql, show=True)
if self.waitTransactionZero(atdSql=newtdSql) is False:
tdLog.exit(f"{sql} transaction not finished")
return False
sql = f"REDISTRIBUTE VGROUP 3 DNODE 1"
newtdSql.execute(sql, show=True)
if self.waitTransactionZero(atdSql=newtdSql) is False:
tdLog.exit(f"{sql} transaction not finished")
return False
return True
def splitVgroupThread(self, p, newtdSql):
newtdSql.execute(f"use db;")
rowLen = newtdSql.query('show vgroups')
if rowLen > 0:
vgroupId = newtdSql.getData(0, 0)
tdLog.info(f"splitVgroupThread vgroupId:{vgroupId} start")
newtdSql.execute(f"split vgroup {vgroupId}")
else:
tdLog.exit("get vgroupId fail!")
if self.waitTransactionZero(atdSql=newtdSql) is False:
tdLog.info(f"transaction not finished")
def waitTransactionZero(self, atdSql, seconds = 300, interval = 1):
# wait end
for i in range(seconds):
sql ="show transactions;"
rows = atdSql.query(sql)
if rows == 0:
tdLog.info("transaction count became zero.")
return True
#tdLog.info(f"i={i} wait ...")
time.sleep(interval)
return False
def waitCompactsZero(self, atdSql, seconds = 300, interval = 1):
# wait end
for i in range(seconds):
sql ="show compacts;"
rows = atdSql.query(sql)
if rows == 0:
tdLog.info("compacts count became zero.")
return True
#tdLog.info(f"i={i} wait ...")
time.sleep(interval)
return False
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())