From 3b23031a44216abd987d4aecb7e96a0a64194367 Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 31 Oct 2024 01:18:00 +0000 Subject: [PATCH 01/25] fix/TS-5251-add-conflict-check --- source/dnode/mnode/impl/src/mndDb.c | 1 + source/dnode/mnode/impl/src/mndTrans.c | 18 ++---------------- source/dnode/mnode/impl/src/mndVgroup.c | 2 ++ 3 files changed, 5 insertions(+), 16 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index aed00af3c1..b16cfe410a 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -1219,6 +1219,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p mndTransSetDbName(pTrans, pOld->name, NULL); TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER); TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 4268d73746..de8b1a243b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -867,6 +867,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { if (pIter == NULL) break; if (pNew->conflict == TRN_CONFLICT_GLOBAL) conflict = true; + if (pNew->conflict == TRN_CONFLICT_DB) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { @@ -874,6 +875,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { mndTransLogConflict(pNew, pTrans, mndCheckStbConflict(pNew->stbname, pTrans), &conflict); } } + if (pNew->conflict == TRN_CONFLICT_DB_INSIDE) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_DB) { @@ -885,22 +887,6 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) { } } -// if (pNew->conflict == TRN_CONFLICT_TOPIC) { -// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; -// if (pTrans->conflict == TRN_CONFLICT_TOPIC || pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { -// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; -// } -// } -// if (pNew->conflict == TRN_CONFLICT_TOPIC_INSIDE) { -// if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; -// if (pTrans->conflict == TRN_CONFLICT_TOPIC) { -// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0) conflict = true; -// } -// if (pTrans->conflict == TRN_CONFLICT_TOPIC_INSIDE) { -// if (strcasecmp(pNew->dbname, pTrans->dbname) == 0 && strcasecmp(pNew->stbname, pTrans->stbname) == 0) -// conflict = true; -// } -// } if (pNew->conflict == TRN_CONFLICT_ARBGROUP) { if (pTrans->conflict == TRN_CONFLICT_GLOBAL) conflict = true; if (pTrans->conflict == TRN_CONFLICT_ARBGROUP) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 5a79ac6bc8..4152232396 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -3424,6 +3424,8 @@ static int32_t mndBalanceVgroup(SMnode *pMnode, SRpcMsg *pReq, SArray *pArray) { } mndTransSetSerial(pTrans); mInfo("trans:%d, used to balance vgroup", pTrans->id); + TAOS_CHECK_GOTO(mndTransCheckConflict(pMnode, pTrans), NULL, _OVER); + TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER); while (1) { taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes); From cea72bb3650cb1c8c46e8f6f3d2d44d2e19d57af Mon Sep 17 00:00:00 2001 From: dmchen Date: Thu, 31 Oct 2024 08:29:11 +0000 Subject: [PATCH 02/25] fix/TS-5251-add-conflict-check-fix-case --- tests/army/cluster/snapshot.py | 5 +++++ tests/army/frame/caseBase.py | 12 ++++++++++++ 2 files changed, 17 insertions(+) diff --git a/tests/army/cluster/snapshot.py b/tests/army/cluster/snapshot.py index d18dbf2796..d14887db5c 100644 --- a/tests/army/cluster/snapshot.py +++ b/tests/army/cluster/snapshot.py @@ -97,6 +97,11 @@ class TDTestCase(TBase): self.alterReplica(1) self.checkAggCorrect() self.compactDb() + + if self.waitCompactsZero() is False: + tdLog.exit(f"compact not finished") + return False + self.alterReplica3() vgids = self.getVGroup(self.db) diff --git a/tests/army/frame/caseBase.py b/tests/army/frame/caseBase.py index 491e432df7..c2b3411e77 100644 --- a/tests/army/frame/caseBase.py +++ b/tests/army/frame/caseBase.py @@ -283,6 +283,18 @@ class TBase: time.sleep(interval) return False + def waitCompactsZero(self, seconds = 300, interval = 1): + # wait end + for i in range(seconds): + sql ="show compacts;" + rows = tdSql.query(sql) + if rows == 0: + tdLog.info("compacts count became zero.") + return True + #tdLog.info(f"i={i} wait ...") + time.sleep(interval) + + return False # check file exist def checkFileExist(self, pathFile): From aea910b8793d1ad60e8a0bd998e0102f26f87557 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 5 Nov 2024 19:16:40 +0800 Subject: [PATCH 03/25] fix/TS-5251-add-conflict-check-add-test --- tests/army/cluster/compactDBConflict.py | 147 ++++++++++++++++++++++++ 1 file changed, 147 insertions(+) create mode 100644 tests/army/cluster/compactDBConflict.py diff --git a/tests/army/cluster/compactDBConflict.py b/tests/army/cluster/compactDBConflict.py new file mode 100644 index 0000000000..f02272cc1f --- /dev/null +++ b/tests/army/cluster/compactDBConflict.py @@ -0,0 +1,147 @@ +################################################################### +# Copyright (c) 2016 by TAOS Technologies, Inc. +# All rights reserved. +# +# This file is proprietary and confidential to TAOS Technologies. +# No part of this file may be reproduced, stored, transmitted, +# disclosed or used in any form or by any means other than as +# expressly provided by the written permission from Jianhui Tao +# +################################################################### + +# -*- coding: utf-8 -*- +import frame.etool +from frame.caseBase import * +from frame.cases import * +from frame import * +import json +import threading + + +class TDTestCase(TBase): + def init(self, conn, logSql, replicaVar=1): + tdLog.debug(f"start to init {__file__}") + self.replicaVar = int(replicaVar) + tdSql.init(conn.cursor(), logSql) # output sql.txt file + self.configJsonFile('splitVgroupByLearner.json', 'db', 4, 1, 'splitVgroupByLearner.json', 100000) + + def configJsonFile(self, fileName, dbName, vgroups, replica, newFileName='', insert_rows=100000, + timestamp_step=10000): + tdLog.debug(f"configJsonFile {fileName}") + filePath = etool.curFile(__file__, fileName) + with open(filePath, 'r') as f: + data = json.load(f) + + if len(newFileName) == 0: + newFileName = fileName + + data['databases'][0]['dbinfo']['name'] = dbName + data['databases'][0]['dbinfo']['vgroups'] = vgroups + data['databases'][0]['dbinfo']['replica'] = replica + data['databases'][0]['super_tables'][0]['insert_rows'] = insert_rows + data['databases'][0]['super_tables'][0]['timestamp_step'] = timestamp_step + json_data = json.dumps(data) + filePath = etool.curFile(__file__, newFileName) + with open(filePath, "w") as file: + file.write(json_data) + + tdLog.debug(f"configJsonFile {json_data}") + + def insertData(self, configFile): + tdLog.info(f"insert data.") + # taosBenchmark run + jfile = etool.curFile(__file__, configFile) + etool.benchMark(json=jfile) + + def run(self): + tdLog.debug(f"start to excute {__file__}") + + self.insertData('splitVgroupByLearner.json') + + ''' + t1 = threading.Thread(target=self.alterDBThread) + t1.start() + tdLog.debug("threading started!!!!!") + time.sleep(3) + tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") + + t1.join() + + + t2 = threading.Thread(target=self.splitVgroupThread) + t2.start() + tdLog.debug("threading started!!!!!") + time.sleep(2) + tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") + + t2.join() + ''' + t3 = threading.Thread(target=self.RedistributeVGroups) + t3.start() + tdLog.debug("threading started!!!!!") + #time.sleep(2) + tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") + + t3.join() + + t4 = threading.Thread(target=self.balanceVGROUPThread) + t4.start() + tdLog.debug("threading started!!!!!") + #time.sleep(2) + tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") + + t4.join() + + + def alterDBThread(self): + tdLog.info("alter db start") + tdSql.execute('ALTER DATABASE db REPLICA 3') + if self.waitTransactionZero() is False: + tdLog.info(f"transaction not finished") + + def balanceVGROUPThread(self): + tdLog.info("balance VGROUP start") + tdSql.execute('BALANCE VGROUP') + if self.waitTransactionZero() is False: + tdLog.info(f"transaction not finished") + + def RedistributeVGroups(self): + sql = f"REDISTRIBUTE VGROUP 5 DNODE 1" + tdSql.execute(sql, show=True) + if self.waitTransactionZero() is False: + tdLog.exit(f"{sql} transaction not finished") + return False + + sql = f"REDISTRIBUTE VGROUP 4 DNODE 1" + tdSql.execute(sql, show=True) + if self.waitTransactionZero() is False: + tdLog.exit(f"{sql} transaction not finished") + return False + + sql = f"REDISTRIBUTE VGROUP 3 DNODE 1" + tdSql.execute(sql, show=True) + if self.waitTransactionZero() is False: + tdLog.exit(f"{sql} transaction not finished") + return False + + return True + + def splitVgroupThread(self): + tdSql.execute('use db') + rowLen = tdSql.query('show vgroups') + if rowLen > 0: + vgroupId = tdSql.getData(0, 0) + tdLog.debug(f"splitVgroupThread vgroupId:{vgroupId}") + tdSql.execute(f"split vgroup {vgroupId}") + else: + tdLog.exit("get vgroupId fail!") + if self.waitTransactionZero() is False: + tdLog.info(f"transaction not finished") + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file From 8c1fdc92ed8b498c79eed08e5fef37350a13561a Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 6 Nov 2024 08:33:33 +0800 Subject: [PATCH 04/25] fix/TS-5251-add-conflict-check-add-case --- tests/parallel_test/cases.task | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index c6596be7fc..9b52f0ba1d 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -35,6 +35,7 @@ ,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_having.py ,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3 +,,y,army,./pytest.sh python3 ./test.py -f cluster/compactDBConflict.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f authorith/authBasic.py -N 3 ,,n,army,python3 ./test.py -f cmdline/fullopt.py ,,y,army,./pytest.sh python3 ./test.py -f query/show.py -N 3 From 3598b3b917e1d73b0a7d7e24b73501f871c2323a Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 6 Nov 2024 09:49:27 +0800 Subject: [PATCH 05/25] fix/TS-5251-add-conflict-check-add-case --- source/dnode/mnode/impl/src/mndTrans.c | 6 ++- tests/army/cluster/compactDBConflict.py | 69 ++++++++++++++++++------- 2 files changed, 54 insertions(+), 21 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index de8b1a243b..e51500bf34 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -949,8 +949,10 @@ int32_t mndTransCheckConflictWithCompact(SMnode *pMnode, STrans *pTrans) { pIter = sdbFetch(pMnode->pSdb, SDB_COMPACT, pIter, (void **)&pCompact); if (pIter == NULL) break; - if (pTrans->conflict == TRN_CONFLICT_GLOBAL || pTrans->conflict == TRN_CONFLICT_DB || - pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { + if (pTrans->conflict == TRN_CONFLICT_GLOBAL) { + thisConflict = true; + } + if (pTrans->conflict == TRN_CONFLICT_DB || pTrans->conflict == TRN_CONFLICT_DB_INSIDE) { if (strcasecmp(pTrans->dbname, pCompact->dbname) == 0) thisConflict = true; } diff --git a/tests/army/cluster/compactDBConflict.py b/tests/army/cluster/compactDBConflict.py index f02272cc1f..12962ed476 100644 --- a/tests/army/cluster/compactDBConflict.py +++ b/tests/army/cluster/compactDBConflict.py @@ -58,40 +58,72 @@ class TDTestCase(TBase): self.insertData('splitVgroupByLearner.json') - ''' - t1 = threading.Thread(target=self.alterDBThread) + tdSql.execute('use db') + + t0 = threading.Thread(target=self.compactDBThread) + t0.start() + tdLog.debug("threading started!!!!!") + tdSql.error('ALTER DATABASE db REPLICA 3;', expectErrInfo="Transaction not completed due to conflict with compact") + t0.join() + + t1 = threading.Thread(target=self.compactDBThread) t1.start() tdLog.debug("threading started!!!!!") - time.sleep(3) - tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") - + time.sleep(1) + tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") t1.join() - - t2 = threading.Thread(target=self.splitVgroupThread) + t2 = threading.Thread(target=self.compactDBThread) t2.start() tdLog.debug("threading started!!!!!") - time.sleep(2) - tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") - + rowLen = tdSql.query('show vgroups') + if rowLen > 0: + vgroupId = tdSql.getData(0, 0) + tdLog.debug(f"splitVgroupThread vgroupId:{vgroupId}") + tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") t2.join() - ''' - t3 = threading.Thread(target=self.RedistributeVGroups) + + t3 = threading.Thread(target=self.compactDBThread) t3.start() tdLog.debug("threading started!!!!!") - #time.sleep(2) - tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") - + time.sleep(1) + tdSql.error('BALANCE VGROUP;', expectErrInfo="Transaction not completed due to conflict with compact") t3.join() - t4 = threading.Thread(target=self.balanceVGROUPThread) + t4 = threading.Thread(target=self.splitVgroupThread) t4.start() tdLog.debug("threading started!!!!!") - #time.sleep(2) + time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") - t4.join() + + t5 = threading.Thread(target=self.RedistributeVGroups) + t5.start() + tdLog.debug("threading started!!!!!") + time.sleep(1) + tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") + t5.join() + t6 = threading.Thread(target=self.balanceVGROUPThread) + t6.start() + tdLog.debug("threading started!!!!!") + time.sleep(1) + tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") + t6.join() + + t7 = threading.Thread(target=self.alterDBThread) + t7.start() + tdLog.debug("threading started!!!!!") + time.sleep(1) + tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") + t7.join() + + + def compactDBThread(self): + tdLog.info("compact db start") + tdSql.execute('compact DATABASE db') + if self.waitCompactsZero() is False: + tdLog.info(f"compact not finished") def alterDBThread(self): tdLog.info("alter db start") @@ -127,7 +159,6 @@ class TDTestCase(TBase): return True def splitVgroupThread(self): - tdSql.execute('use db') rowLen = tdSql.query('show vgroups') if rowLen > 0: vgroupId = tdSql.getData(0, 0) From b1d5dc9e05426da5fc804ea6f808f06caf573af9 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 11 Nov 2024 09:54:56 +0800 Subject: [PATCH 06/25] fix/TS-5251-add-conflic-check-fix-case --- tests/army/cluster/compactDBConflict.py | 111 +++++++++++++----------- 1 file changed, 59 insertions(+), 52 deletions(-) diff --git a/tests/army/cluster/compactDBConflict.py b/tests/army/cluster/compactDBConflict.py index 12962ed476..975bf72588 100644 --- a/tests/army/cluster/compactDBConflict.py +++ b/tests/army/cluster/compactDBConflict.py @@ -19,63 +19,68 @@ import threading class TDTestCase(TBase): - def init(self, conn, logSql, replicaVar=1): + def init(self, conn, logSql, replicaVar=1): tdLog.debug(f"start to init {__file__}") self.replicaVar = int(replicaVar) tdSql.init(conn.cursor(), logSql) # output sql.txt file - self.configJsonFile('splitVgroupByLearner.json', 'db', 4, 1, 'splitVgroupByLearner.json', 100000) + self.configJsonFile('splitVgroupByLearner.json', 'db', 4, 1, 'compactDBConflict.json', 100000) - def configJsonFile(self, fileName, dbName, vgroups, replica, newFileName='', insert_rows=100000, + def configJsonFile(self, fileName, dbName, vgroups, replica, newFileName='', insert_rows=100000, timestamp_step=10000): - tdLog.debug(f"configJsonFile {fileName}") - filePath = etool.curFile(__file__, fileName) - with open(filePath, 'r') as f: - data = json.load(f) + tdLog.debug(f"configJsonFile {fileName}") + filePath = etool.curFile(__file__, fileName) + with open(filePath, 'r') as f: + data = json.load(f) - if len(newFileName) == 0: - newFileName = fileName + if len(newFileName) == 0: + newFileName = fileName - data['databases'][0]['dbinfo']['name'] = dbName - data['databases'][0]['dbinfo']['vgroups'] = vgroups - data['databases'][0]['dbinfo']['replica'] = replica - data['databases'][0]['super_tables'][0]['insert_rows'] = insert_rows - data['databases'][0]['super_tables'][0]['timestamp_step'] = timestamp_step - json_data = json.dumps(data) - filePath = etool.curFile(__file__, newFileName) - with open(filePath, "w") as file: - file.write(json_data) + data['databases'][0]['dbinfo']['name'] = dbName + data['databases'][0]['dbinfo']['vgroups'] = vgroups + data['databases'][0]['dbinfo']['replica'] = replica + data['databases'][0]['super_tables'][0]['insert_rows'] = insert_rows + data['databases'][0]['super_tables'][0]['timestamp_step'] = timestamp_step + json_data = json.dumps(data) + filePath = etool.curFile(__file__, newFileName) + with open(filePath, "w") as file: + file.write(json_data) - tdLog.debug(f"configJsonFile {json_data}") + tdLog.debug(f"configJsonFile {json_data}") - def insertData(self, configFile): - tdLog.info(f"insert data.") - # taosBenchmark run - jfile = etool.curFile(__file__, configFile) - etool.benchMark(json=jfile) + def insertData(self, configFile): + tdLog.info(f"insert data.") + # taosBenchmark run + jfile = etool.curFile(__file__, configFile) + etool.benchMark(json=jfile) - def run(self): + def run(self): tdLog.debug(f"start to excute {__file__}") self.insertData('splitVgroupByLearner.json') tdSql.execute('use db') - t0 = threading.Thread(target=self.compactDBThread) + event0 = threading.Event() + t0 = threading.Thread(target=self.compactDBThread, args=(event0)) t0.start() tdLog.debug("threading started!!!!!") + event0.wait() tdSql.error('ALTER DATABASE db REPLICA 3;', expectErrInfo="Transaction not completed due to conflict with compact") t0.join() - t1 = threading.Thread(target=self.compactDBThread) + event1 = threading.Event() + t1 = threading.Thread(target=self.compactDBThread, args=(event1)) t1.start() tdLog.debug("threading started!!!!!") - time.sleep(1) + event1.wait() tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") t1.join() - t2 = threading.Thread(target=self.compactDBThread) + event2 = threading.Event() + t2 = threading.Thread(target=self.compactDBThread, args=(event2)) t2.start() tdLog.debug("threading started!!!!!") + event2.wait() rowLen = tdSql.query('show vgroups') if rowLen > 0: vgroupId = tdSql.getData(0, 0) @@ -83,10 +88,11 @@ class TDTestCase(TBase): tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") t2.join() - t3 = threading.Thread(target=self.compactDBThread) + event3 = threading.Event() + t3 = threading.Thread(target=self.compactDBThread, args=(event3)) t3.start() tdLog.debug("threading started!!!!!") - time.sleep(1) + event3.wait() tdSql.error('BALANCE VGROUP;', expectErrInfo="Transaction not completed due to conflict with compact") t3.join() @@ -119,31 +125,32 @@ class TDTestCase(TBase): t7.join() - def compactDBThread(self): - tdLog.info("compact db start") - tdSql.execute('compact DATABASE db') - if self.waitCompactsZero() is False: - tdLog.info(f"compact not finished") + def compactDBThread(self, event): + tdLog.info("compact db start") + tdSql.execute('compact DATABASE db') + event.set() + if self.waitCompactsZero() is False: + tdLog.info(f"compact not finished") - def alterDBThread(self): - tdLog.info("alter db start") - tdSql.execute('ALTER DATABASE db REPLICA 3') - if self.waitTransactionZero() is False: - tdLog.info(f"transaction not finished") + def alterDBThread(self): + tdLog.info("alter db start") + tdSql.execute('ALTER DATABASE db REPLICA 3') + if self.waitTransactionZero() is False: + tdLog.info(f"transaction not finished") - def balanceVGROUPThread(self): - tdLog.info("balance VGROUP start") - tdSql.execute('BALANCE VGROUP') - if self.waitTransactionZero() is False: - tdLog.info(f"transaction not finished") + def balanceVGROUPThread(self): + tdLog.info("balance VGROUP start") + tdSql.execute('BALANCE VGROUP') + if self.waitTransactionZero() is False: + tdLog.info(f"transaction not finished") - def RedistributeVGroups(self): + def RedistributeVGroups(self): sql = f"REDISTRIBUTE VGROUP 5 DNODE 1" tdSql.execute(sql, show=True) if self.waitTransactionZero() is False: tdLog.exit(f"{sql} transaction not finished") return False - + sql = f"REDISTRIBUTE VGROUP 4 DNODE 1" tdSql.execute(sql, show=True) if self.waitTransactionZero() is False: @@ -158,7 +165,7 @@ class TDTestCase(TBase): return True - def splitVgroupThread(self): + def splitVgroupThread(self): rowLen = tdSql.query('show vgroups') if rowLen > 0: vgroupId = tdSql.getData(0, 0) @@ -169,9 +176,9 @@ class TDTestCase(TBase): if self.waitTransactionZero() is False: tdLog.info(f"transaction not finished") - def stop(self): - tdSql.close() - tdLog.success(f"{__file__} successfully executed") + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") tdCases.addLinux(__file__, TDTestCase()) From d9ff5576005968c5ea2e7349cc23703f7514fe74 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 11 Nov 2024 09:57:13 +0800 Subject: [PATCH 07/25] fix/TS-5251-add-conflict-check-fix-case --- tests/army/cluster/compactDBConflict.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/army/cluster/compactDBConflict.py b/tests/army/cluster/compactDBConflict.py index 975bf72588..c2ce8f1d96 100644 --- a/tests/army/cluster/compactDBConflict.py +++ b/tests/army/cluster/compactDBConflict.py @@ -23,7 +23,7 @@ class TDTestCase(TBase): tdLog.debug(f"start to init {__file__}") self.replicaVar = int(replicaVar) tdSql.init(conn.cursor(), logSql) # output sql.txt file - self.configJsonFile('splitVgroupByLearner.json', 'db', 4, 1, 'compactDBConflict.json', 100000) + self.configJsonFile('compactDBConflict.json', 'db', 4, 1, 'compactDBConflict.json', 100000) def configJsonFile(self, fileName, dbName, vgroups, replica, newFileName='', insert_rows=100000, timestamp_step=10000): @@ -56,7 +56,7 @@ class TDTestCase(TBase): def run(self): tdLog.debug(f"start to excute {__file__}") - self.insertData('splitVgroupByLearner.json') + self.insertData('compactDBConflict.json') tdSql.execute('use db') From 659abc1dc426c9af37cb73d52d45f2efa97f8fe6 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 11 Nov 2024 10:41:56 +0800 Subject: [PATCH 08/25] fix/TS-5251-add-conflict-check-fix-case --- tests/army/cluster/compactDBConflict.py | 30 ++++++++++++------------- 1 file changed, 15 insertions(+), 15 deletions(-) diff --git a/tests/army/cluster/compactDBConflict.py b/tests/army/cluster/compactDBConflict.py index c2ce8f1d96..0645fcd8e1 100644 --- a/tests/army/cluster/compactDBConflict.py +++ b/tests/army/cluster/compactDBConflict.py @@ -60,26 +60,26 @@ class TDTestCase(TBase): tdSql.execute('use db') - event0 = threading.Event() - t0 = threading.Thread(target=self.compactDBThread, args=(event0)) + event = threading.Event() + t0 = threading.Thread(target=self.compactDBThread, args=('', event)) t0.start() - tdLog.debug("threading started!!!!!") - event0.wait() + tdLog.debug("t0 threading started!!!!!") + event.wait() tdSql.error('ALTER DATABASE db REPLICA 3;', expectErrInfo="Transaction not completed due to conflict with compact") t0.join() event1 = threading.Event() - t1 = threading.Thread(target=self.compactDBThread, args=(event1)) + t1 = threading.Thread(target=self.compactDBThread, args=('', event1)) t1.start() - tdLog.debug("threading started!!!!!") + tdLog.debug("t1 threading started!!!!!") event1.wait() tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") t1.join() event2 = threading.Event() - t2 = threading.Thread(target=self.compactDBThread, args=(event2)) + t2 = threading.Thread(target=self.compactDBThread, args=('', event2)) t2.start() - tdLog.debug("threading started!!!!!") + tdLog.debug("t2 threading started!!!!!") event2.wait() rowLen = tdSql.query('show vgroups') if rowLen > 0: @@ -89,43 +89,43 @@ class TDTestCase(TBase): t2.join() event3 = threading.Event() - t3 = threading.Thread(target=self.compactDBThread, args=(event3)) + t3 = threading.Thread(target=self.compactDBThread, args=('', event3)) t3.start() - tdLog.debug("threading started!!!!!") + tdLog.debug("t3 threading started!!!!!") event3.wait() tdSql.error('BALANCE VGROUP;', expectErrInfo="Transaction not completed due to conflict with compact") t3.join() t4 = threading.Thread(target=self.splitVgroupThread) t4.start() - tdLog.debug("threading started!!!!!") + tdLog.debug("t4 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t4.join() t5 = threading.Thread(target=self.RedistributeVGroups) t5.start() - tdLog.debug("threading started!!!!!") + tdLog.debug("t5 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t5.join() t6 = threading.Thread(target=self.balanceVGROUPThread) t6.start() - tdLog.debug("threading started!!!!!") + tdLog.debug("t6 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t6.join() t7 = threading.Thread(target=self.alterDBThread) t7.start() - tdLog.debug("threading started!!!!!") + tdLog.debug("t7 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t7.join() - def compactDBThread(self, event): + def compactDBThread(self, p, event): tdLog.info("compact db start") tdSql.execute('compact DATABASE db') event.set() From 73d19520de6a1b8d76cb8dca9880ff9d16220f46 Mon Sep 17 00:00:00 2001 From: dmchen Date: Mon, 11 Nov 2024 14:03:03 +0800 Subject: [PATCH 09/25] fix/TS-5251-add-conflicat-check-fix-case --- tests/army/cluster/compactDBConflict.json | 62 +++++++++++++++++++++++ 1 file changed, 62 insertions(+) create mode 100644 tests/army/cluster/compactDBConflict.json diff --git a/tests/army/cluster/compactDBConflict.json b/tests/army/cluster/compactDBConflict.json new file mode 100644 index 0000000000..ab9b72d260 --- /dev/null +++ b/tests/army/cluster/compactDBConflict.json @@ -0,0 +1,62 @@ +{ + "filetype": "insert", + "cfgdir": "/etc/taos", + "host": "127.0.0.1", + "port": 6030, + "user": "root", + "password": "taosdata", + "connection_pool_size": 8, + "num_of_records_per_req": 3000, + "prepared_rand": 3000, + "thread_count": 2, + "create_table_thread_count": 1, + "confirm_parameter_prompt": "no", + "continue_if_fail": "yes", + "databases": [ + { + "dbinfo": { + "name": "db", + "drop": "yes", + "vgroups": 2, + "replica": 3, + "duration":"1d", + "wal_retention_period": 1, + "wal_retention_size": 1, + "keep": "3d,6d,30d" + }, + "super_tables": [ + { + "name": "stb", + "child_table_exists": "no", + "childtable_count": 10, + "insert_rows": 100000000, + "childtable_prefix": "d", + "insert_mode": "taosc", + "timestamp_step": 10000, + "start_timestamp":"now-12d", + "columns": [ + { "type": "bool", "name": "bc"}, + { "type": "float", "name": "fc" }, + { "type": "double", "name": "dc"}, + { "type": "tinyint", "name": "ti"}, + { "type": "smallint", "name": "si" }, + { "type": "int", "name": "ic" }, + { "type": "bigint", "name": "bi" }, + { "type": "utinyint", "name": "uti"}, + { "type": "usmallint", "name": "usi"}, + { "type": "uint", "name": "ui" }, + { "type": "ubigint", "name": "ubi"}, + { "type": "binary", "name": "bin", "len": 16}, + { "type": "nchar", "name": "nch", "len": 32} + ], + "tags": [ + {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, + {"name": "location","type": "binary", "len": 16, "values": + ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] + } + ] + } + ] + } + ] +} From 8e45332228761e16007049ccf14c40329be83468 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 12 Nov 2024 11:26:30 +0800 Subject: [PATCH 10/25] fix/TS-5251-ad-conflict-check-fix-case --- tests/army/cluster/compactDBConflict.json | 62 ----------- .../6-cluster}/compactDBConflict.py | 100 ++++++++++-------- 2 files changed, 53 insertions(+), 109 deletions(-) delete mode 100644 tests/army/cluster/compactDBConflict.json rename tests/{army/cluster => system-test/6-cluster}/compactDBConflict.py (71%) diff --git a/tests/army/cluster/compactDBConflict.json b/tests/army/cluster/compactDBConflict.json deleted file mode 100644 index ab9b72d260..0000000000 --- a/tests/army/cluster/compactDBConflict.json +++ /dev/null @@ -1,62 +0,0 @@ -{ - "filetype": "insert", - "cfgdir": "/etc/taos", - "host": "127.0.0.1", - "port": 6030, - "user": "root", - "password": "taosdata", - "connection_pool_size": 8, - "num_of_records_per_req": 3000, - "prepared_rand": 3000, - "thread_count": 2, - "create_table_thread_count": 1, - "confirm_parameter_prompt": "no", - "continue_if_fail": "yes", - "databases": [ - { - "dbinfo": { - "name": "db", - "drop": "yes", - "vgroups": 2, - "replica": 3, - "duration":"1d", - "wal_retention_period": 1, - "wal_retention_size": 1, - "keep": "3d,6d,30d" - }, - "super_tables": [ - { - "name": "stb", - "child_table_exists": "no", - "childtable_count": 10, - "insert_rows": 100000000, - "childtable_prefix": "d", - "insert_mode": "taosc", - "timestamp_step": 10000, - "start_timestamp":"now-12d", - "columns": [ - { "type": "bool", "name": "bc"}, - { "type": "float", "name": "fc" }, - { "type": "double", "name": "dc"}, - { "type": "tinyint", "name": "ti"}, - { "type": "smallint", "name": "si" }, - { "type": "int", "name": "ic" }, - { "type": "bigint", "name": "bi" }, - { "type": "utinyint", "name": "uti"}, - { "type": "usmallint", "name": "usi"}, - { "type": "uint", "name": "ui" }, - { "type": "ubigint", "name": "ubi"}, - { "type": "binary", "name": "bin", "len": 16}, - { "type": "nchar", "name": "nch", "len": 32} - ], - "tags": [ - {"type": "tinyint", "name": "groupid","max": 10,"min": 1}, - {"name": "location","type": "binary", "len": 16, "values": - ["San Francisco", "Los Angles", "San Diego", "San Jose", "Palo Alto", "Campbell", "Mountain View","Sunnyvale", "Santa Clara", "Cupertino"] - } - ] - } - ] - } - ] -} diff --git a/tests/army/cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py similarity index 71% rename from tests/army/cluster/compactDBConflict.py rename to tests/system-test/6-cluster/compactDBConflict.py index 0645fcd8e1..118557d563 100644 --- a/tests/army/cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -10,116 +10,97 @@ ################################################################### # -*- coding: utf-8 -*- -import frame.etool -from frame.caseBase import * -from frame.cases import * -from frame import * -import json +from util.log import * +from util.cases import * +from util.dnodes import * +from util.sql import * import threading -class TDTestCase(TBase): +class TDTestCase: def init(self, conn, logSql, replicaVar=1): tdLog.debug(f"start to init {__file__}") self.replicaVar = int(replicaVar) tdSql.init(conn.cursor(), logSql) # output sql.txt file - self.configJsonFile('compactDBConflict.json', 'db', 4, 1, 'compactDBConflict.json', 100000) - - def configJsonFile(self, fileName, dbName, vgroups, replica, newFileName='', insert_rows=100000, - timestamp_step=10000): - tdLog.debug(f"configJsonFile {fileName}") - filePath = etool.curFile(__file__, fileName) - with open(filePath, 'r') as f: - data = json.load(f) - - if len(newFileName) == 0: - newFileName = fileName - - data['databases'][0]['dbinfo']['name'] = dbName - data['databases'][0]['dbinfo']['vgroups'] = vgroups - data['databases'][0]['dbinfo']['replica'] = replica - data['databases'][0]['super_tables'][0]['insert_rows'] = insert_rows - data['databases'][0]['super_tables'][0]['timestamp_step'] = timestamp_step - json_data = json.dumps(data) - filePath = etool.curFile(__file__, newFileName) - with open(filePath, "w") as file: - file.write(json_data) - - tdLog.debug(f"configJsonFile {json_data}") - - def insertData(self, configFile): - tdLog.info(f"insert data.") - # taosBenchmark run - jfile = etool.curFile(__file__, configFile) - etool.benchMark(json=jfile) def run(self): tdLog.debug(f"start to excute {__file__}") - self.insertData('compactDBConflict.json') + tdSql.execute('CREATE DATABASE db vgroups 4 replica 1;') - tdSql.execute('use db') + tdSql.execute('use db;') + tdLog.debug("start test1") event = threading.Event() t0 = threading.Thread(target=self.compactDBThread, args=('', event)) t0.start() - tdLog.debug("t0 threading started!!!!!") + tdLog.info("t0 threading started,wait compact db tran finish") event.wait() tdSql.error('ALTER DATABASE db REPLICA 3;', expectErrInfo="Transaction not completed due to conflict with compact") + tdLog.info("wait compact db finish") t0.join() + tdLog.debug("start test2") event1 = threading.Event() t1 = threading.Thread(target=self.compactDBThread, args=('', event1)) t1.start() - tdLog.debug("t1 threading started!!!!!") + tdLog.info("t1 threading started,wait compact db tran finish") event1.wait() tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") + tdLog.info("wait compact db finish") t1.join() + tdLog.debug("start test3") event2 = threading.Event() t2 = threading.Thread(target=self.compactDBThread, args=('', event2)) t2.start() - tdLog.debug("t2 threading started!!!!!") + tdLog.info("t2 threading started,wait compact db tran finish") event2.wait() rowLen = tdSql.query('show vgroups') if rowLen > 0: vgroupId = tdSql.getData(0, 0) - tdLog.debug(f"splitVgroupThread vgroupId:{vgroupId}") + tdLog.info(f"splitVgroupThread vgroupId:{vgroupId}") tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") + tdLog.info("wait compact db finish") t2.join() + tdLog.debug("start test4") event3 = threading.Event() t3 = threading.Thread(target=self.compactDBThread, args=('', event3)) t3.start() - tdLog.debug("t3 threading started!!!!!") + tdLog.info("t3 threading started!!!!!") event3.wait() tdSql.error('BALANCE VGROUP;', expectErrInfo="Transaction not completed due to conflict with compact") t3.join() + tdLog.debug("start test5") t4 = threading.Thread(target=self.splitVgroupThread) t4.start() - tdLog.debug("t4 threading started!!!!!") + tdLog.info("t4 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t4.join() + tdLog.debug("start test6") t5 = threading.Thread(target=self.RedistributeVGroups) t5.start() - tdLog.debug("t5 threading started!!!!!") + tdLog.info("t5 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t5.join() + tdLog.debug("start test7") t6 = threading.Thread(target=self.balanceVGROUPThread) t6.start() - tdLog.debug("t6 threading started!!!!!") + tdLog.info("t6 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t6.join() + tdLog.debug("start test8") t7 = threading.Thread(target=self.alterDBThread) t7.start() - tdLog.debug("t7 threading started!!!!!") + tdLog.info("t7 threading started!!!!!") time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t7.join() @@ -175,7 +156,32 @@ class TDTestCase(TBase): tdLog.exit("get vgroupId fail!") if self.waitTransactionZero() is False: tdLog.info(f"transaction not finished") - + + def waitTransactionZero(self, seconds = 300, interval = 1): + # wait end + for i in range(seconds): + sql ="show transactions;" + rows = tdSql.query(sql) + if rows == 0: + tdLog.info("transaction count became zero.") + return True + #tdLog.info(f"i={i} wait ...") + time.sleep(interval) + + return False + def waitCompactsZero(self, seconds = 300, interval = 1): + # wait end + for i in range(seconds): + sql ="show compacts;" + rows = tdSql.query(sql) + if rows == 0: + tdLog.info("compacts count became zero.") + return True + #tdLog.info(f"i={i} wait ...") + time.sleep(interval) + + return False + def stop(self): tdSql.close() tdLog.success(f"{__file__} successfully executed") From 830cbf6b2c074b82d9900d0201b4973b76502dd3 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 12 Nov 2024 11:29:33 +0800 Subject: [PATCH 11/25] fix/TS-5251-add-conflict-check-fix-case --- tests/parallel_test/cases.task | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 9b52f0ba1d..a47bc65fc1 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -35,7 +35,6 @@ ,,y,army,./pytest.sh python3 ./test.py -f query/accuracy/test_having.py ,,y,army,./pytest.sh python3 ./test.py -f insert/insert_basic.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f cluster/splitVgroupByLearner.py -N 3 -,,y,army,./pytest.sh python3 ./test.py -f cluster/compactDBConflict.py -N 3 ,,y,army,./pytest.sh python3 ./test.py -f authorith/authBasic.py -N 3 ,,n,army,python3 ./test.py -f cmdline/fullopt.py ,,y,army,./pytest.sh python3 ./test.py -f query/show.py -N 3 @@ -738,6 +737,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas.py -N 4 -M 1 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_insertdatas_querys.py -N 4 -M 1 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/vnode/4dnode1mnode_basic_replica3_vgroups.py -N 4 -M 1 +,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/compactDBConflict.py -N 3 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/between.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/distinct.py -Q 2 ,,y,system-test,./pytest.sh python3 ./test.py -f 2-query/varchar.py -Q 2 From 93c58252a645579bffbb053826e752452cc51785 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 12 Nov 2024 16:54:18 +0800 Subject: [PATCH 12/25] fix/TS-5251-add-conflict-check-fix-case --- tests/system-test/6-cluster/compactDBConflict.py | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/system-test/6-cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py index 118557d563..dc6321f955 100644 --- a/tests/system-test/6-cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -56,11 +56,7 @@ class TDTestCase: t2.start() tdLog.info("t2 threading started,wait compact db tran finish") event2.wait() - rowLen = tdSql.query('show vgroups') - if rowLen > 0: - vgroupId = tdSql.getData(0, 0) - tdLog.info(f"splitVgroupThread vgroupId:{vgroupId}") - tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") + tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") tdLog.info("wait compact db finish") t2.join() @@ -126,6 +122,7 @@ class TDTestCase: tdLog.info(f"transaction not finished") def RedistributeVGroups(self): + tdLog.info("REDISTRIBUTE VGROUP start") sql = f"REDISTRIBUTE VGROUP 5 DNODE 1" tdSql.execute(sql, show=True) if self.waitTransactionZero() is False: @@ -150,7 +147,7 @@ class TDTestCase: rowLen = tdSql.query('show vgroups') if rowLen > 0: vgroupId = tdSql.getData(0, 0) - tdLog.debug(f"splitVgroupThread vgroupId:{vgroupId}") + tdLog.info(f"splitVgroupThread vgroupId:{vgroupId} start") tdSql.execute(f"split vgroup {vgroupId}") else: tdLog.exit("get vgroupId fail!") From ce9b3e8462e1d410c675af40dff683f4f377e9e2 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 12 Nov 2024 19:23:25 +0800 Subject: [PATCH 13/25] fix/TS-5251-add-conflict-check --- tests/system-test/6-cluster/compactDBConflict.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/tests/system-test/6-cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py index dc6321f955..2fdaa80559 100644 --- a/tests/system-test/6-cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -93,6 +93,7 @@ class TDTestCase: tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t6.join() + ''' tdLog.debug("start test8") t7 = threading.Thread(target=self.alterDBThread) t7.start() @@ -100,6 +101,7 @@ class TDTestCase: time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t7.join() + ''' def compactDBThread(self, p, event): From b98cf8df373c3e7f4189fbcf7be82e7c9b278571 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 13 Nov 2024 09:06:41 +0800 Subject: [PATCH 14/25] fix/TS-5251-add-conflict-check-fix-case --- tests/system-test/6-cluster/compactDBConflict.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/system-test/6-cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py index 2fdaa80559..dc6321f955 100644 --- a/tests/system-test/6-cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -93,7 +93,6 @@ class TDTestCase: tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t6.join() - ''' tdLog.debug("start test8") t7 = threading.Thread(target=self.alterDBThread) t7.start() @@ -101,7 +100,6 @@ class TDTestCase: time.sleep(1) tdSql.error('compact database db;', expectErrInfo="Conflict transaction not completed") t7.join() - ''' def compactDBThread(self, p, event): From 5cf233893a0e18fa0b274c1e74235e4860985a8a Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 13 Nov 2024 15:20:35 +0800 Subject: [PATCH 15/25] fix/TS-5251-add-conflict-check-fix-case --- .../6-cluster/compactDBConflict.py | 49 +++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) diff --git a/tests/system-test/6-cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py index dc6321f955..b25da07964 100644 --- a/tests/system-test/6-cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -14,6 +14,7 @@ from util.log import * from util.cases import * from util.dnodes import * from util.sql import * +from util.common import tdCom import threading @@ -32,7 +33,8 @@ class TDTestCase: tdLog.debug("start test1") event = threading.Event() - t0 = threading.Thread(target=self.compactDBThread, args=('', event)) + newTdSql=tdCom.newTdSql() + t0 = threading.Thread(target=self.compactDBThread, args=('', event, newTdSql)) t0.start() tdLog.info("t0 threading started,wait compact db tran finish") event.wait() @@ -42,7 +44,8 @@ class TDTestCase: tdLog.debug("start test2") event1 = threading.Event() - t1 = threading.Thread(target=self.compactDBThread, args=('', event1)) + newTdSql1=tdCom.newTdSql() + t1 = threading.Thread(target=self.compactDBThread, args=('', event1, newTdSql1)) t1.start() tdLog.info("t1 threading started,wait compact db tran finish") event1.wait() @@ -52,7 +55,8 @@ class TDTestCase: tdLog.debug("start test3") event2 = threading.Event() - t2 = threading.Thread(target=self.compactDBThread, args=('', event2)) + newTdSql2=tdCom.newTdSql() + t2 = threading.Thread(target=self.compactDBThread, args=('', event2, newTdSql2)) t2.start() tdLog.info("t2 threading started,wait compact db tran finish") event2.wait() @@ -62,7 +66,8 @@ class TDTestCase: tdLog.debug("start test4") event3 = threading.Event() - t3 = threading.Thread(target=self.compactDBThread, args=('', event3)) + newTdSql3=tdCom.newTdSql() + t3 = threading.Thread(target=self.compactDBThread, args=('', event3, newTdSql3)) t3.start() tdLog.info("t3 threading started!!!!!") event3.wait() @@ -70,7 +75,8 @@ class TDTestCase: t3.join() tdLog.debug("start test5") - t4 = threading.Thread(target=self.splitVgroupThread) + newTdSql4=tdCom.newTdSql() + t4 = threading.Thread(target=self.splitVgroupThread, args=('', newTdSql4)) t4.start() tdLog.info("t4 threading started!!!!!") time.sleep(1) @@ -78,7 +84,8 @@ class TDTestCase: t4.join() tdLog.debug("start test6") - t5 = threading.Thread(target=self.RedistributeVGroups) + newTdSql5=tdCom.newTdSql() + t5 = threading.Thread(target=self.RedistributeVGroups, args=('', newTdSql5)) t5.start() tdLog.info("t5 threading started!!!!!") time.sleep(1) @@ -86,7 +93,8 @@ class TDTestCase: t5.join() tdLog.debug("start test7") - t6 = threading.Thread(target=self.balanceVGROUPThread) + newTdSql6=tdCom.newTdSql() + t6 = threading.Thread(target=self.balanceVGROUPThread, args=('', newTdSql6)) t6.start() tdLog.info("t6 threading started!!!!!") time.sleep(1) @@ -94,7 +102,8 @@ class TDTestCase: t6.join() tdLog.debug("start test8") - t7 = threading.Thread(target=self.alterDBThread) + newTdSql7=tdCom.newTdSql() + t7 = threading.Thread(target=self.alterDBThread, args=('', newTdSql7)) t7.start() tdLog.info("t7 threading started!!!!!") time.sleep(1) @@ -102,53 +111,53 @@ class TDTestCase: t7.join() - def compactDBThread(self, p, event): + def compactDBThread(self, p, event, newtdSql): tdLog.info("compact db start") - tdSql.execute('compact DATABASE db') + newtdSql.execute('compact DATABASE db') event.set() if self.waitCompactsZero() is False: tdLog.info(f"compact not finished") - def alterDBThread(self): + def alterDBThread(self, p, newtdSql): tdLog.info("alter db start") - tdSql.execute('ALTER DATABASE db REPLICA 3') + newtdSql.execute('ALTER DATABASE db REPLICA 3') if self.waitTransactionZero() is False: tdLog.info(f"transaction not finished") - def balanceVGROUPThread(self): + def balanceVGROUPThread(self, p, newtdSql): tdLog.info("balance VGROUP start") - tdSql.execute('BALANCE VGROUP') + newtdSql.execute('BALANCE VGROUP') if self.waitTransactionZero() is False: tdLog.info(f"transaction not finished") - def RedistributeVGroups(self): + def RedistributeVGroups(self, p, newtdSql): tdLog.info("REDISTRIBUTE VGROUP start") sql = f"REDISTRIBUTE VGROUP 5 DNODE 1" - tdSql.execute(sql, show=True) + newtdSql.execute(sql, show=True) if self.waitTransactionZero() is False: tdLog.exit(f"{sql} transaction not finished") return False sql = f"REDISTRIBUTE VGROUP 4 DNODE 1" - tdSql.execute(sql, show=True) + newtdSql.execute(sql, show=True) if self.waitTransactionZero() is False: tdLog.exit(f"{sql} transaction not finished") return False sql = f"REDISTRIBUTE VGROUP 3 DNODE 1" - tdSql.execute(sql, show=True) + newtdSql.execute(sql, show=True) if self.waitTransactionZero() is False: tdLog.exit(f"{sql} transaction not finished") return False return True - def splitVgroupThread(self): + def splitVgroupThread(self, p, newtdSql): rowLen = tdSql.query('show vgroups') if rowLen > 0: vgroupId = tdSql.getData(0, 0) tdLog.info(f"splitVgroupThread vgroupId:{vgroupId} start") - tdSql.execute(f"split vgroup {vgroupId}") + newtdSql.execute(f"split vgroup {vgroupId}") else: tdLog.exit("get vgroupId fail!") if self.waitTransactionZero() is False: From 84cfce3e99c77bbddfc7aeaf6f90e2ca6a01a245 Mon Sep 17 00:00:00 2001 From: dmchen Date: Wed, 13 Nov 2024 18:15:20 +0800 Subject: [PATCH 16/25] fix/TS-5251-add-conflict-check-fix-case --- .../6-cluster/compactDBConflict.py | 27 ++++++++++--------- 1 file changed, 14 insertions(+), 13 deletions(-) diff --git a/tests/system-test/6-cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py index b25da07964..9f7f670087 100644 --- a/tests/system-test/6-cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -115,59 +115,60 @@ class TDTestCase: tdLog.info("compact db start") newtdSql.execute('compact DATABASE db') event.set() - if self.waitCompactsZero() is False: + if self.waitCompactsZero(atdSql=newtdSql) is False: tdLog.info(f"compact not finished") def alterDBThread(self, p, newtdSql): tdLog.info("alter db start") newtdSql.execute('ALTER DATABASE db REPLICA 3') - if self.waitTransactionZero() is False: + if self.waitTransactionZero(atdSql=newtdSql) is False: tdLog.info(f"transaction not finished") def balanceVGROUPThread(self, p, newtdSql): tdLog.info("balance VGROUP start") newtdSql.execute('BALANCE VGROUP') - if self.waitTransactionZero() is False: + if self.waitTransactionZero(atdSql=newtdSql) is False: tdLog.info(f"transaction not finished") def RedistributeVGroups(self, p, newtdSql): tdLog.info("REDISTRIBUTE VGROUP start") sql = f"REDISTRIBUTE VGROUP 5 DNODE 1" newtdSql.execute(sql, show=True) - if self.waitTransactionZero() is False: + if self.waitTransactionZero(atdSql=newtdSql) is False: tdLog.exit(f"{sql} transaction not finished") return False sql = f"REDISTRIBUTE VGROUP 4 DNODE 1" newtdSql.execute(sql, show=True) - if self.waitTransactionZero() is False: + if self.waitTransactionZero(atdSql=newtdSql) is False: tdLog.exit(f"{sql} transaction not finished") return False sql = f"REDISTRIBUTE VGROUP 3 DNODE 1" newtdSql.execute(sql, show=True) - if self.waitTransactionZero() is False: + if self.waitTransactionZero(atdSql=newtdSql) is False: tdLog.exit(f"{sql} transaction not finished") return False return True def splitVgroupThread(self, p, newtdSql): - rowLen = tdSql.query('show vgroups') + newtdSql.execute(f"use db;") + rowLen = newtdSql.query('show vgroups') if rowLen > 0: - vgroupId = tdSql.getData(0, 0) + vgroupId = newtdSql.getData(0, 0) tdLog.info(f"splitVgroupThread vgroupId:{vgroupId} start") newtdSql.execute(f"split vgroup {vgroupId}") else: tdLog.exit("get vgroupId fail!") - if self.waitTransactionZero() is False: + if self.waitTransactionZero(atdSql=newtdSql) is False: tdLog.info(f"transaction not finished") - def waitTransactionZero(self, seconds = 300, interval = 1): + def waitTransactionZero(self, atdSql, seconds = 300, interval = 1): # wait end for i in range(seconds): sql ="show transactions;" - rows = tdSql.query(sql) + rows = atdSql.query(sql) if rows == 0: tdLog.info("transaction count became zero.") return True @@ -175,11 +176,11 @@ class TDTestCase: time.sleep(interval) return False - def waitCompactsZero(self, seconds = 300, interval = 1): + def waitCompactsZero(self, atdSql, seconds = 300, interval = 1): # wait end for i in range(seconds): sql ="show compacts;" - rows = tdSql.query(sql) + rows = atdSql.query(sql) if rows == 0: tdLog.info("compacts count became zero.") return True From ad55bb7a85aa8fea04c4bd9ac1e85e8a5c45c636 Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 26 Nov 2024 09:07:05 +0800 Subject: [PATCH 17/25] fix/TS-5251-add-conflict-check-add-case --- tests/system-test/6-cluster/compactDBConflict.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/system-test/6-cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py index 9f7f670087..62ddd99585 100644 --- a/tests/system-test/6-cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -60,7 +60,13 @@ class TDTestCase: t2.start() tdLog.info("t2 threading started,wait compact db tran finish") event2.wait() - tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") + rowLen = tdSql.query('show vgroups') + if rowLen > 0: + vgroupId = tdSql.getData(0, 0) + tdLog.info(f"splitVgroupThread vgroupId:{vgroupId} start") + tdSql.error(f"split vgroup {vgroupId}", expectErrInfo="Transaction not completed due to conflict with compact") + else: + tdLog.exit("get vgroupId fail!") tdLog.info("wait compact db finish") t2.join() From 6c820cc79cf5db6263aba832e19be016c218830b Mon Sep 17 00:00:00 2001 From: dmchen Date: Tue, 26 Nov 2024 10:04:13 +0800 Subject: [PATCH 18/25] fix/TS-5251-add-conflict-check-add-case --- source/dnode/mnode/impl/src/mndVgroup.c | 1 + tests/system-test/6-cluster/compactDBConflict.py | 8 ++++---- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 544db76a49..01dedd4762 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -3193,6 +3193,7 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro mInfo("trans:%d, used to split vgroup, vgId:%d", pTrans->id, pVgroup->vgId); mndTransSetDbName(pTrans, pDb->name, NULL); + TAOS_CHECK_GOTO(mndTransCheckConflictWithCompact(pMnode, pTrans), NULL, _OVER); SVgObj newVg1 = {0}; memcpy(&newVg1, pVgroup, sizeof(SVgObj)); diff --git a/tests/system-test/6-cluster/compactDBConflict.py b/tests/system-test/6-cluster/compactDBConflict.py index 62ddd99585..54d1513ad0 100644 --- a/tests/system-test/6-cluster/compactDBConflict.py +++ b/tests/system-test/6-cluster/compactDBConflict.py @@ -36,7 +36,7 @@ class TDTestCase: newTdSql=tdCom.newTdSql() t0 = threading.Thread(target=self.compactDBThread, args=('', event, newTdSql)) t0.start() - tdLog.info("t0 threading started,wait compact db tran finish") + tdLog.info("t0 threading started,wait compact db tran start") event.wait() tdSql.error('ALTER DATABASE db REPLICA 3;', expectErrInfo="Transaction not completed due to conflict with compact") tdLog.info("wait compact db finish") @@ -47,7 +47,7 @@ class TDTestCase: newTdSql1=tdCom.newTdSql() t1 = threading.Thread(target=self.compactDBThread, args=('', event1, newTdSql1)) t1.start() - tdLog.info("t1 threading started,wait compact db tran finish") + tdLog.info("t1 threading started,wait compact db tran start") event1.wait() tdSql.error('REDISTRIBUTE VGROUP 5 DNODE 1;', expectErrInfo="Transaction not completed due to conflict with compact") tdLog.info("wait compact db finish") @@ -58,12 +58,12 @@ class TDTestCase: newTdSql2=tdCom.newTdSql() t2 = threading.Thread(target=self.compactDBThread, args=('', event2, newTdSql2)) t2.start() - tdLog.info("t2 threading started,wait compact db tran finish") + tdLog.info("t2 threading started,wait compact db tran start") event2.wait() rowLen = tdSql.query('show vgroups') if rowLen > 0: vgroupId = tdSql.getData(0, 0) - tdLog.info(f"splitVgroupThread vgroupId:{vgroupId} start") + tdLog.info(f"split Vgroup vgroupId:{vgroupId} start") tdSql.error(f"split vgroup {vgroupId}", expectErrInfo="Transaction not completed due to conflict with compact") else: tdLog.exit("get vgroupId fail!") From 7d961a7fec175fa81aaae5a2459ca748e11dfa71 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Wed, 27 Nov 2024 10:15:50 +0800 Subject: [PATCH 19/25] Enh close wal module open file. --- source/libs/wal/src/walMeta.c | 11 ++--- source/libs/wal/src/walWrite.c | 75 ++++++++++++++-------------------- 2 files changed, 36 insertions(+), 50 deletions(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index ce2b9218b5..0000647a61 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -371,7 +371,7 @@ static int32_t walLogEntriesComplete(const SWal* pWal) { } static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); if (!pFileInfo) { TAOS_RETURN(TSDB_CODE_FAILED); @@ -384,7 +384,7 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { if (taosStatFile(fnameStr, &fileSize, NULL, NULL) != 0) { wError("vgId:%d, failed to stat file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); code = terrno; - TAOS_RETURN(code); + goto _exit; } int64_t records = TMAX(0, pFileInfo->lastVer - pFileInfo->firstVer + 1); int64_t lastEndOffset = records * sizeof(SWalIdxEntry); @@ -395,7 +395,7 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE); if (pFile == NULL) { - TAOS_RETURN(terrno); + goto _exit; } wInfo("vgId:%d, trim idx file. file: %s, size: %" PRId64 ", offset: %" PRId64, pWal->cfg.vgId, fnameStr, fileSize, @@ -404,10 +404,11 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { code = taosFtruncateFile(pFile, lastEndOffset); if (code < 0) { wError("vgId:%d, failed to truncate file due to %s. file:%s", pWal->cfg.vgId, strerror(errno), fnameStr); - TAOS_RETURN(code); + goto _exit; } - (void)taosCloseFile(&pFile); +_exit: + (void)taosCloseFile(&pFile); TAOS_RETURN(TSDB_CODE_SUCCESS); } diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 66ead2fd26..6fe750d143 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -160,12 +160,12 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) { TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); + int32_t code = 0; int64_t ret; char fnameStr[WAL_FILE_LEN]; if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(TSDB_CODE_WAL_INVALID_VER); + code = TSDB_CODE_WAL_INVALID_VER; + goto _exit; } // find correct file @@ -173,9 +173,8 @@ int32_t walRollback(SWal *pWal, int64_t ver) { // change current files ret = walChangeWrite(pWal, ver); if (ret < 0) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(terrno); + code = terrno; + goto _exit; } // delete files in descending order @@ -200,23 +199,20 @@ int32_t walRollback(SWal *pWal, int64_t ver) { TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile)); TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); if (pIdxFile == NULL) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(terrno); + code = terrno; + goto _exit; } int64_t idxOff = walGetVerIdxOffset(pWal, ver); ret = taosLSeekFile(pIdxFile, idxOff, SEEK_SET); if (ret < 0) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(terrno); + code = terrno; + goto _exit; } // read idx file and get log file pos SWalIdxEntry entry; if (taosReadFile(pIdxFile, &entry, sizeof(SWalIdxEntry)) != sizeof(SWalIdxEntry)) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(terrno); + code = terrno; + goto _exit; } walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); @@ -225,70 +221,58 @@ int32_t walRollback(SWal *pWal, int64_t ver) { wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { // TODO - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(terrno); + code = terrno; + goto _exit; } ret = taosLSeekFile(pLogFile, entry.offset, SEEK_SET); if (ret < 0) { // TODO - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(terrno); + code = terrno; + goto _exit; } // validate offset SWalCkHead head; int64_t size = taosReadFile(pLogFile, &head, sizeof(SWalCkHead)); if (size != sizeof(SWalCkHead)) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(terrno); + code = terrno; + goto _exit; } - int32_t code = walValidHeadCksum(&head); + code = walValidHeadCksum(&head); if (code != 0) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + code = TSDB_CODE_WAL_FILE_CORRUPTED; + goto _exit; } if (head.head.version != ver) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(TSDB_CODE_WAL_FILE_CORRUPTED); + code = TSDB_CODE_WAL_FILE_CORRUPTED; + goto _exit; } // truncate old files code = taosFtruncateFile(pLogFile, entry.offset); if (code < 0) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(code); + goto _exit; } code = taosFtruncateFile(pIdxFile, idxOff); if (code < 0) { - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(code); + goto _exit; } pWal->vers.lastVer = ver - 1; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->lastVer = ver - 1; ((SWalFileInfo *)taosArrayGetLast(pWal->fileInfoSet))->fileSize = entry.offset; - TAOS_UNUSED(taosCloseFile(&pIdxFile)); - TAOS_UNUSED(taosCloseFile(&pLogFile)); - code = walSaveMeta(pWal); if (code < 0) { wError("vgId:%d, failed to save meta since %s", pWal->cfg.vgId, terrstr()); - TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - - TAOS_RETURN(code); + goto _exit; } - // unlock +_exit: + TAOS_UNUSED(taosCloseFile(&pIdxFile)); + TAOS_UNUSED(taosCloseFile(&pLogFile)); TAOS_UNUSED(taosThreadRwlockUnlock(&pWal->mutex)); - TAOS_RETURN(TSDB_CODE_SUCCESS); + TAOS_RETURN(code); } static int32_t walRollImpl(SWal *pWal) { @@ -718,6 +702,7 @@ static int32_t walInitWriteFile(SWal *pWal) { walBuildLogName(pWal, fileFirstVer, fnameStr); pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pLogTFile == NULL) { + taosCloseFile(&pIdxTFile); TAOS_RETURN(terrno); } // switch file From d609d144be9743aa506133853241cd29e474f7ad Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Wed, 27 Nov 2024 13:46:55 +0800 Subject: [PATCH 20/25] Fix ci crash. --- source/libs/wal/src/walWrite.c | 15 +++++++-------- 1 file changed, 7 insertions(+), 8 deletions(-) diff --git a/source/libs/wal/src/walWrite.c b/source/libs/wal/src/walWrite.c index 6fe750d143..69e70a0f47 100644 --- a/source/libs/wal/src/walWrite.c +++ b/source/libs/wal/src/walWrite.c @@ -160,9 +160,10 @@ static int64_t walChangeWrite(SWal *pWal, int64_t ver) { int32_t walRollback(SWal *pWal, int64_t ver) { TAOS_UNUSED(taosThreadRwlockWrlock(&pWal->mutex)); wInfo("vgId:%d, wal rollback for version %" PRId64, pWal->cfg.vgId, ver); - int32_t code = 0; - int64_t ret; - char fnameStr[WAL_FILE_LEN]; + int32_t code = 0; + int64_t ret; + char fnameStr[WAL_FILE_LEN]; + TdFilePtr pIdxFile = NULL, pLogFile = NULL; if (ver > pWal->vers.lastVer || ver <= pWal->vers.commitVer || ver <= pWal->vers.snapshotVer) { code = TSDB_CODE_WAL_INVALID_VER; goto _exit; @@ -196,8 +197,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } walBuildIdxName(pWal, walGetCurFileFirstVer(pWal), fnameStr); - TAOS_UNUSED(taosCloseFile(&pWal->pIdxFile)); - TdFilePtr pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); + pIdxFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); if (pIdxFile == NULL) { code = terrno; goto _exit; @@ -216,8 +216,7 @@ int32_t walRollback(SWal *pWal, int64_t ver) { } walBuildLogName(pWal, walGetCurFileFirstVer(pWal), fnameStr); - TAOS_UNUSED(taosCloseFile(&pWal->pLogFile)); - TdFilePtr pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); + pLogFile = taosOpenFile(fnameStr, TD_FILE_WRITE | TD_FILE_READ | TD_FILE_APPEND); wDebug("vgId:%d, wal truncate file %s", pWal->cfg.vgId, fnameStr); if (pLogFile == NULL) { // TODO @@ -702,7 +701,7 @@ static int32_t walInitWriteFile(SWal *pWal) { walBuildLogName(pWal, fileFirstVer, fnameStr); pLogTFile = taosOpenFile(fnameStr, TD_FILE_CREATE | TD_FILE_WRITE | TD_FILE_APPEND); if (pLogTFile == NULL) { - taosCloseFile(&pIdxTFile); + TAOS_UNUSED(taosCloseFile(&pIdxTFile)); TAOS_RETURN(terrno); } // switch file From 1eb005a9f09ed2afd45a0b8fee729941a0c20a29 Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Wed, 27 Nov 2024 16:27:00 +0800 Subject: [PATCH 21/25] Fix error code return. --- source/libs/wal/src/walMeta.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 0000647a61..43688347db 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -395,6 +395,7 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE); if (pFile == NULL) { + code = terrno; goto _exit; } @@ -409,7 +410,7 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { _exit: (void)taosCloseFile(&pFile); - TAOS_RETURN(TSDB_CODE_SUCCESS); + TAOS_RETURN(code); } static void printFileSet(int32_t vgId, SArray* fileSet, const char* str) { From edba0063aab89f0d3b57738424c4d90b61c7c1e6 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Nov 2024 18:26:35 +0800 Subject: [PATCH 22/25] fix:[TD-33048] add ts to cols if dataFormat is true in schemaless to avoid schemal is old --- source/client/src/clientSml.c | 2 +- utils/test/c/sml_test.c | 81 +++++++++++++++++++++++++++++++++++ 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 8602421ed0..112fc8f5bc 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -233,7 +233,7 @@ int32_t smlBuildSuperTableInfo(SSmlHandle *info, SSmlLineInfo *currElement, SSml goto END; } SML_CHECK_CODE(smlBuildSTableMeta(info->dataFormat, sMeta)); - for (int i = 1; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++) { + for (int i = 0; i < pTableMeta->tableInfo.numOfTags + pTableMeta->tableInfo.numOfColumns; i++) { SSchema *col = pTableMeta->schema + i; SSmlKv kv = {.key = col->name, .keyLen = strlen(col->name), .type = col->type}; if (col->type == TSDB_DATA_TYPE_NCHAR) { diff --git a/utils/test/c/sml_test.c b/utils/test/c/sml_test.c index a8d4fafb03..bf04352232 100644 --- a/utils/test/c/sml_test.c +++ b/utils/test/c/sml_test.c @@ -2253,6 +2253,83 @@ int sml_ts5528_test(){ return 0; } +int sml_td33048_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "drop database if exists td33048"); + taos_free_result(pRes); + + pRes = taos_query(taos, "create database if not exists td33048"); + taos_free_result(pRes); + + // check column name duplication + const char *sql[] = { + "alarm_record,tag=alarm_record uid=\"3+8001+c939604c\",deviceId=\"3\",alarmId=\"8001\",alarmStatus=\"false\",lotNo=\"2411A0302\",subMode=\"11\",occurTime=\"2024-11-25 09:31:52.702\" 1732527117484", + }; + pRes = taos_query(taos, "use td33048"); + taos_free_result(pRes); + pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + int code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == 0); + taos_free_result(pRes); + + // check tag name duplication + const char *sql1[] = { + "alarm_record,tag=alarm_record uid=\"2+100012+303fe9b5\",deviceId=\"2\",alarmId=\"100012\",alarmStatus=\"false\",lotNo=\"2411A0202\",subMode=\"11\",occurTime=\"2024-11-25 09:31:55.591\" 1732527119493", + }; + pRes = taos_schemaless_insert(taos, (char **)sql1, sizeof(sql1) / sizeof(sql1[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_MILLI_SECONDS); + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == 0); + taos_free_result(pRes); + + pRes = taos_query(taos, "select * from alarm_record"); + code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == 0); + taos_free_result(pRes); + + taos_close(taos); + + return code; +} + +int sml_td17324_Test() { + TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); + + TAOS_RES *pRes = taos_query(taos, "drop database if exists gcbacaefqk"); + taos_free_result(pRes); + + pRes = taos_query(taos, "create database if not exists gcbacaefqk PRECISION 'ns'"); + taos_free_result(pRes); + + pRes = taos_query(taos, "use gcbacaefqk"); + taos_free_result(pRes); + + pRes = taos_query(taos, "create stable gcbacaefqk.test_stb(_ts timestamp, f int) tags(t1 bigint)"); + taos_free_result(pRes); + // check column name duplication + const char *sql[] = { + "st123456,t1=3i64,t2=4f64,t3=\"t3\" c1=3i64,c3=L\"passit\",c2=false,c4=4f64 1732700000364000000", + "st123456,t1=4i64,t3=\"t4\",t2=5f64,t4=5f64 c1=3i64,c3=L\"passitagin\",c2=true,c4=5f64,c5=5f64 1732700000361000000", + "test_stb,t2=5f64,t3=L\"ste\" c1=true,c2=4i64,c3=\"iam\" 1732700000364316532" + }; + + pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_LINE_PROTOCOL, + TSDB_SML_TIMESTAMP_NANO_SECONDS); + int code = taos_errno(pRes); + printf("%s result0:%s\n", __FUNCTION__, taos_errstr(pRes)); + ASSERT(code == 0); + taos_free_result(pRes); + + taos_close(taos); + + return code; +} + int main(int argc, char *argv[]) { if (argc == 2) { taos_options(TSDB_OPTION_CONFIGDIR, argv[1]); @@ -2262,6 +2339,10 @@ int main(int argc, char *argv[]) { ASSERT(!ret); ret = sml_ts5528_test(); ASSERT(!ret); + ret = sml_td33048_Test(); + ASSERT(!ret); + ret = sml_td17324_Test(); + ASSERT(!ret); ret = sml_td29691_Test(); ASSERT(ret); ret = sml_td29373_Test(); From ce475566377131110b55c0441baa885dbc5c3add Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Nov 2024 19:55:47 +0800 Subject: [PATCH 23/25] fix:[TD-33048] add ts to cols if dataFormat is true in schemaless to avoid schemal is old --- source/client/src/clientSml.c | 23 ++++++++++++++--------- tests/system-test/2-query/sml_TS-3724.py | 8 ++++---- 2 files changed, 18 insertions(+), 13 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 112fc8f5bc..911e3664f5 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -772,22 +772,27 @@ END: RETURN } -static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols, bool isTag) { +static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols) { int32_t code = TSDB_CODE_SUCCESS; int32_t lino = 0; SHashObj *hashTmp = taosHashInit(length, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, HASH_NO_LOCK); SML_CHECK_NULL(hashTmp); - int32_t i = 0; - for (; i < length; i++) { - SML_CHECK_CODE(taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &i, SHORT_BYTES)); + for (int32_t i = 0; i < length; i++) { + SML_CHECK_CODE(taosHashPut(hashTmp, schema[i].name, strlen(schema[i].name), &schema[i], sizeof(SSchema))); } - i = isTag ? 0 : 1; - for (; i < taosArrayGetSize(cols); i++) { + for (int32_t i = 0; i < taosArrayGetSize(cols); i++) { SSmlKv *kv = (SSmlKv *)taosArrayGet(cols, i); SML_CHECK_NULL(kv); - if (taosHashGet(hashTmp, kv->key, kv->keyLen) == NULL) { + SSchema *sTmp = taosHashGet(hashTmp, kv->key, kv->keyLen); + if (sTmp == NULL) { SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA); } + if ((IS_VAR_DATA_TYPE(kv->type) && kv->length + VARSTR_HEADER_SIZE > sTmp->bytes) || + (!IS_VAR_DATA_TYPE(kv->type) && kv->length != sTmp->bytes)){ + uError("column %s (type %s) bytes invalid. db bytes:%d, kv bytes:%zu", sTmp->name, + tDataTypes[sTmp->type].name, sTmp->bytes, kv->length); + SML_CHECK_CODE(TSDB_CODE_INTERNAL_ERROR); + } } END: @@ -1132,8 +1137,8 @@ static int32_t smlModifyDBSchemas(SSmlHandle *info) { } if (needCheckMeta) { - SML_CHECK_CODE(smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags, true)); - SML_CHECK_CODE(smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols, false)); + SML_CHECK_CODE(smlCheckMeta(&(pTableMeta->schema[pTableMeta->tableInfo.numOfColumns]), pTableMeta->tableInfo.numOfTags, sTableData->tags)); + SML_CHECK_CODE(smlCheckMeta(&(pTableMeta->schema[0]), pTableMeta->tableInfo.numOfColumns, sTableData->cols)); } taosMemoryFreeClear(sTableData->tableMeta); diff --git a/tests/system-test/2-query/sml_TS-3724.py b/tests/system-test/2-query/sml_TS-3724.py index b537ad9b9a..9511ee8aa1 100644 --- a/tests/system-test/2-query/sml_TS-3724.py +++ b/tests/system-test/2-query/sml_TS-3724.py @@ -14,7 +14,7 @@ sys.path.append("./7-tmq") from tmqCommon import * class TDTestCase: - updatecfgDict = {'clientCfg': {'smlChildTableName': 'dataModelName', 'fqdn': 'localhost', 'smlTsDefaultName': "times"}, 'fqdn': 'localhost'} + updatecfgDict = {'clientCfg': {'smlChildTableName': 'dataModelName', 'fqdn': 'localhost'}, 'fqdn': 'localhost'} print("===================: ", updatecfgDict) def init(self, conn, logSql, replicaVar=1): @@ -58,7 +58,7 @@ class TDTestCase: tdSql.query(f"select distinct tbname from {dbname}.readings") tdSql.checkRows(4) - tdSql.query(f"select * from {dbname}.t_0799064f5487946e5d22164a822acfc8 order by times") + tdSql.query(f"select * from {dbname}.t_0799064f5487946e5d22164a822acfc8 order by _ts") tdSql.checkRows(2) tdSql.checkData(0, 3, "kk") tdSql.checkData(1, 3, "") @@ -67,7 +67,7 @@ class TDTestCase: tdSql.query(f"select distinct tbname from {dbname}.`sys_if_bytes_out`") tdSql.checkRows(2) - tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by times") + tdSql.query(f"select * from {dbname}.t_f67972b49aa8adf8bca5d0d54f0d850d order by _ts") tdSql.checkRows(2) tdSql.checkData(0, 1, 1.300000000) tdSql.checkData(1, 1, 13.000000000) @@ -80,7 +80,7 @@ class TDTestCase: tdSql.query(f"select distinct tbname from {dbname}.`sys_cpu_nice`") tdSql.checkRows(3) - tdSql.query(f"select * from {dbname}.`sys_cpu_nice` order by times") + tdSql.query(f"select * from {dbname}.`sys_cpu_nice` order by _ts") tdSql.checkRows(4) tdSql.checkData(0, 1, 13.000000000) tdSql.checkData(0, 2, "web01") From b036345382183f6227bbea334c13a3391ac7b650 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 27 Nov 2024 22:11:49 +0800 Subject: [PATCH 24/25] fix:[TD-33048] add ts to cols if dataFormat is true in schemaless to avoid schemal is old --- source/client/src/clientSml.c | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 911e3664f5..e1316af9a5 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -787,8 +787,7 @@ static int32_t smlCheckMeta(SSchema *schema, int32_t length, SArray *cols) { if (sTmp == NULL) { SML_CHECK_CODE(TSDB_CODE_SML_INVALID_DATA); } - if ((IS_VAR_DATA_TYPE(kv->type) && kv->length + VARSTR_HEADER_SIZE > sTmp->bytes) || - (!IS_VAR_DATA_TYPE(kv->type) && kv->length != sTmp->bytes)){ + if ((IS_VAR_DATA_TYPE(kv->type) && kv->length + VARSTR_HEADER_SIZE > sTmp->bytes)){ uError("column %s (type %s) bytes invalid. db bytes:%d, kv bytes:%zu", sTmp->name, tDataTypes[sTmp->type].name, sTmp->bytes, kv->length); SML_CHECK_CODE(TSDB_CODE_INTERNAL_ERROR); From 4f2faff91daa893d0f6505a782eff14fe123161f Mon Sep 17 00:00:00 2001 From: xiao-77 Date: Thu, 28 Nov 2024 11:20:22 +0800 Subject: [PATCH 25/25] Fix maybe close undefine fd ptr. --- source/libs/wal/src/walMeta.c | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index 43688347db..b40a9eeefe 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -372,6 +372,7 @@ static int32_t walLogEntriesComplete(const SWal* pWal) { static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { int32_t code = TSDB_CODE_SUCCESS; + TdFilePtr pFile = NULL; SWalFileInfo* pFileInfo = taosArrayGet(pWal->fileInfoSet, fileIdx); if (!pFileInfo) { TAOS_RETURN(TSDB_CODE_FAILED); @@ -393,7 +394,7 @@ static int32_t walTrimIdxFile(SWal* pWal, int32_t fileIdx) { TAOS_RETURN(TSDB_CODE_SUCCESS); } - TdFilePtr pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE); + pFile = taosOpenFile(fnameStr, TD_FILE_READ | TD_FILE_WRITE); if (pFile == NULL) { code = terrno; goto _exit;