From 1bd9c5337dca35818cd88101038816a4eb47d55c Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Tue, 15 Aug 2023 17:09:23 +0800 Subject: [PATCH 01/15] test: modify testcase on windows --- tests/system-test/1-insert/precisionNS.py | 14 +++++++------- tests/system-test/7-tmq/tmqClientConsLog.py | 5 +++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/tests/system-test/1-insert/precisionNS.py b/tests/system-test/1-insert/precisionNS.py index be8f1e21dc..674234b9b0 100644 --- a/tests/system-test/1-insert/precisionNS.py +++ b/tests/system-test/1-insert/precisionNS.py @@ -224,13 +224,13 @@ class TDTestCase: sql = f"select timediff(ts - {val}b, ts1) from st " self.checkExpect(sql, val) - # init - def init(self, conn, logSql, replicaVar=1): - seed = time.clock_gettime(time.CLOCK_REALTIME) - random.seed(seed) - self.replicaVar = int(replicaVar) - tdLog.debug(f"start to excute {__file__}") - tdSql.init(conn.cursor(), True) + # # init + # def init(self, conn, logSql, replicaVar=1): + # seed = time.clock_gettime(time.CLOCK_REALTIME) + # random.seed(seed) + # self.replicaVar = int(replicaVar) + # tdLog.debug(f"start to excute {__file__}") + # tdSql.init(conn.cursor(), True) # where def checkWhere(self): diff --git a/tests/system-test/7-tmq/tmqClientConsLog.py b/tests/system-test/7-tmq/tmqClientConsLog.py index 7f755726ce..d708e6642c 100644 --- a/tests/system-test/7-tmq/tmqClientConsLog.py +++ b/tests/system-test/7-tmq/tmqClientConsLog.py @@ -93,19 +93,20 @@ class TDTestCase: cfgPath = tdCom.getClientCfgPath() taosLogFile = '%s/../log/taoslog*'%(cfgPath) filterResultFile = '%s/../log/filter'%(cfgPath) - cmdStr = 'grep "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile) + cmdStr = 'grep -h "process poll rsp, vgId:" %s >> %s'%(taosLogFile, filterResultFile) tdLog.info(cmdStr) os.system(cmdStr) consumerDict = {} for index, line in enumerate(open(filterResultFile,'r')): + # tdLog.info("row[%d]: %s"%(index, line)) valueList = line.split(',') # for i in range(len(valueList)): # tdLog.info("index[%d]: %s"%(i, valueList[i])) # get consumer id list2 = valueList[0].split(':') - list3 = list2[4].split() + list3 = list2[3].split() consumerId = list3[0] print("consumerId: %s"%(consumerId)) From 1074e02f36c01c8424daf8fc98289cc580f51fc1 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Wed, 16 Aug 2023 13:47:05 +0800 Subject: [PATCH 02/15] Update precisionNS.py --- tests/system-test/1-insert/precisionNS.py | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/tests/system-test/1-insert/precisionNS.py b/tests/system-test/1-insert/precisionNS.py index 674234b9b0..11d79180a9 100644 --- a/tests/system-test/1-insert/precisionNS.py +++ b/tests/system-test/1-insert/precisionNS.py @@ -224,13 +224,13 @@ class TDTestCase: sql = f"select timediff(ts - {val}b, ts1) from st " self.checkExpect(sql, val) - # # init - # def init(self, conn, logSql, replicaVar=1): - # seed = time.clock_gettime(time.CLOCK_REALTIME) - # random.seed(seed) - # self.replicaVar = int(replicaVar) - # tdLog.debug(f"start to excute {__file__}") - # tdSql.init(conn.cursor(), True) + # init + def init(self, conn, logSql, replicaVar=1): + seed = time.time() % 10000 + random.seed(seed) + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), True) # where def checkWhere(self): From 9ff243a854af66e8fbb64222b1951ea5cf74032e Mon Sep 17 00:00:00 2001 From: "chao.feng" Date: Fri, 18 Aug 2023 13:39:43 +0800 Subject: [PATCH 03/15] add tmq test case to support data precision 'us' and 'ns' by charles --- tests/parallel_test/cases.task | 1 + .../system-test/7-tmq/tmqDataPrecisionUnit.py | 139 ++++++++++++++++++ 2 files changed, 140 insertions(+) create mode 100644 tests/system-test/7-tmq/tmqDataPrecisionUnit.py diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 741226f101..5974da6c80 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -128,6 +128,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/dataFromTsdbNWal-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_taosx.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py +,,n,system-test,python3 ./test.py -f 7-tmq/tmqDataPrecisionUnit.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/raw_block_interface_test.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/stbTagFilter-multiCtb.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSubscribeStb-r3.py -N 5 diff --git a/tests/system-test/7-tmq/tmqDataPrecisionUnit.py b/tests/system-test/7-tmq/tmqDataPrecisionUnit.py new file mode 100644 index 0000000000..f050116a1b --- /dev/null +++ b/tests/system-test/7-tmq/tmqDataPrecisionUnit.py @@ -0,0 +1,139 @@ +import sys +import re +import time +import threading +from taos.tmq import * +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +sys.path.append("./7-tmq") +from tmqCommon import * + + +class TDTestCase: + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor(), False) + + self.db_name = "tmq_db" + self.topic_name = "tmq_topic" + self.stable_name = "stb" + self.rows_per_table = 1000 + self.ctb_num = 100 + + def prepareData(self, precisionUnit="ms"): + tdLog.printNoPrefix("======== prepare test env include database, stable, ctables, and insert data: ") + startTS = 1672502400000 + if precisionUnit == "us": + startTS = 1672502400000000 + elif precisionUnit == "ns": + startTS = 1672502400000000000 + + paraDict = { + 'dbName': self.db_name, + 'dropFlag': 1, + 'event': '', + 'vgroups': 4, + 'stbName': self.stable_name, + 'colPrefix': 'c', + 'tagPrefix': 't', + 'colSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1},{'type': 'TIMESTAMP', 'count':1}], + 'tagSchema': [{'type': 'INT', 'count':1},{'type': 'BIGINT', 'count':1},{'type': 'DOUBLE', 'count':1},{'type': 'BINARY', 'len':32, 'count':1},{'type': 'NCHAR', 'len':32, 'count':1}], + 'ctbPrefix': 'ctb', + 'ctbStartIdx': 0, + 'ctbNum': self.ctb_num, + 'rowsPerTbl': self.rows_per_table, + 'batchNum': 100, + 'startTs': startTS, # 2023-01-01 00:00:00.000 + 'pollDelay': 3, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0 + } + + # init the consumer database + tmqCom.initConsumerTable() + + # create testing database、stable、ctables + tdCom.create_database(tdSql, paraDict["dbName"], paraDict["dropFlag"], vgroups=paraDict["vgroups"], replica=self.replicaVar, precision=precisionUnit) + tdLog.info("create database %s successfully" % paraDict["dbName"]) + tmqCom.create_stable(tdSql, dbName=paraDict["dbName"], stbName=paraDict["stbName"]) + tdLog.info("create stable %s successfully" % paraDict["stbName"]) + tmqCom.create_ctable(tdSql, dbName=paraDict["dbName"],stbName=paraDict["stbName"], ctbPrefix=paraDict['ctbPrefix'], + ctbNum=paraDict["ctbNum"], ctbStartIdx=paraDict['ctbStartIdx']) + tdLog.info("create child tables successfully") + + # insert data into tables and wait the async thread exit + tdLog.info("insert data into tables") + pThread = tmqCom.asyncInsertDataByInterlace(paraDict) + pThread.join() + + def run(self): + """Check tmq feature for different data precision unit like "ms、us、ns" + """ + precision_unit = ["ms", "us", "ns"] + for unit in precision_unit: + tdLog.info(f"start to test precision unit {unit}") + self.prepareData(precisionUnit=unit) + # drop database if exists + tdSql.execute(f"drop database if exists {self.db_name}") + self.prepareData(unit) + + # create topic + tdLog.info("create topic from %s" % self.stable_name) + queryString = "select ts, c1, c2 from %s.%s where t4 == 'beijing' or t4 == 'changsha' "%(self.db_name, self.stable_name) + sqlString = "create topic %s as %s" %(self.topic_name, queryString) + tdLog.info("create topic sql: %s"%sqlString) + tdSql.execute(sqlString) + + # save consumer info + consumerId = 0 + expectrowcnt = self.rows_per_table * self.ctb_num + topicList = self.topic_name + ifcheckdata = 0 + ifManualCommit = 0 + keyList = 'group.id:cgrp1,\ + enable.auto.commit:false,\ + auto.commit.interval.ms:6000,\ + auto.offset.reset:earliest' + tmqCom.insertConsumerInfo(consumerId, expectrowcnt, topicList, keyList, ifcheckdata, ifManualCommit) + + # start consume processor + paraDict = { + 'pollDelay': 15, + 'showMsg': 1, + 'showRow': 1, + 'snapshot': 0 + } + tdLog.info("start consume processor") + tmqCom.startTmqSimProcess(pollDelay=paraDict['pollDelay'], dbName=self.db_name, showMsg=paraDict['showMsg'], showRow=paraDict['showRow'], snapshot=paraDict['snapshot']) + + tdLog.info("start to check consume result") + expectRows = 1 + resultList = tmqCom.selectConsumeResult(expectRows) + totalConsumeRows = 0 + for i in range(expectRows): + totalConsumeRows += resultList[i] + + tdSql.query(queryString) + totalRowsFromQuery = tdSql.getRows() + tdLog.info("act consume rows: %d, act query rows: %d "%(totalConsumeRows, totalRowsFromQuery)) + + if totalConsumeRows < totalRowsFromQuery: + tdLog.exit("tmq consume rows error!") + + tmqCom.waitSubscriptionExit(tdSql, self.topic_name) + tdSql.query("drop topic %s" % self.topic_name) + tdSql.execute("drop database %s" % self.db_name) + + def stop(self): + tdSql.execute(f"drop database if exists {self.db_name}") + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) From f01a26f68a22036b986daa4754c223e85de2b17d Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 14:26:58 +0800 Subject: [PATCH 04/15] enh: refactor func names of new Vg prepare actions --- source/dnode/mnode/impl/inc/mndVgroup.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 6 +++--- source/dnode/mnode/impl/src/mndSma.c | 2 +- source/dnode/mnode/impl/src/mndVgroup.c | 6 +++--- 4 files changed, 8 insertions(+), 8 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 7c2f8b5b65..4dbd2fe7f8 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -37,7 +37,7 @@ int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId); int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups); -int32_t mndAddPrepareNewVgAction(SMnode *, STrans *pTrans, SVgObj *pVg); +int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg); int32_t mndAddCreateVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid); int32_t mndAddAlterVnodeConfirmAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAddAlterVnodeAction(SMnode *, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 4f7e80c0a3..e9f04dac52 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -448,9 +448,9 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; } -static int32_t mndSetPrepareNewVgActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { +static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { - if (mndAddPrepareNewVgAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; + if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; } return 0; } @@ -633,7 +633,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; mndTransSetOper(pTrans, MND_OPER_CREATE_DB); - if (mndSetPrepareNewVgActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; + if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSma.c b/source/dnode/mnode/impl/src/mndSma.c index e186a8742f..d666f80fd3 100644 --- a/source/dnode/mnode/impl/src/mndSma.c +++ b/source/dnode/mnode/impl/src/mndSma.c @@ -631,7 +631,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea mndTransSetSerial(pTrans); mInfo("trans:%d, used to create sma:%s stream:%s", pTrans->id, pCreate->name, streamObj.name); - if (mndAddPrepareNewVgAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; if (mndSetCreateSmaVgroupRedoLogs(pMnode, pTrans, &streamObj.fixedSinkVg) != 0) goto _OVER; if (mndSetCreateSmaCommitLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 406392271c..99adaf8f67 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1259,7 +1259,7 @@ int32_t mndAddAlterVnodeConfigAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb return 0; } -int32_t mndAddPrepareNewVgAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { +int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { SSdbRaw *pRaw = mndVgroupActionEncode(pVg); if (pRaw == NULL) goto _err; @@ -2380,13 +2380,13 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); int32_t srcVgId = newVg1.vgId; newVg1.vgId = maxVgId; - if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg1) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg1) != 0) goto _OVER; if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg1) != 0) goto _OVER; maxVgId++; srcVgId = newVg2.vgId; newVg2.vgId = maxVgId; - if (mndAddPrepareNewVgAction(pMnode, pTrans, &newVg2) != 0) goto _OVER; + if (mndAddNewVgPrepareAction(pMnode, pTrans, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeHashRangeAction(pMnode, pTrans, srcVgId, &newVg2) != 0) goto _OVER; if (mndAddAlterVnodeConfirmAction(pMnode, pTrans, pDb, &newVg1) != 0) goto _OVER; From 0242d4ce33755c488d85aa8463704fe0728ca70f Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 18 Aug 2023 16:07:15 +0800 Subject: [PATCH 05/15] enh(tsdb/write): timestamp out of range with s3's last tier storage --- source/dnode/vnode/src/tsdb/tsdbWrite.c | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 6e89b47adc..5949b103d5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -14,6 +14,7 @@ */ #include "tsdb.h" +#include "vndCos.h" /** * @brief max key by precision @@ -76,9 +77,18 @@ int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq2 *pMsg) { int32_t code = 0; STsdbKeepCfg *pCfg = &pTsdb->keepCfg; TSKEY now = taosGetTimestamp(pCfg->precision); - TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1; + TSKEY minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep2; TSKEY maxKey = tsMaxKeyByPrecision[pCfg->precision]; int32_t size = taosArrayGetSize(pMsg->aSubmitTbData); + int32_t nlevel = tfsGetLevel(pTsdb->pVnode->pTfs); + + if (nlevel > 1 && tsS3Enabled) { + if (nlevel == 3) { + minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep1; + } else if (nlevel == 2) { + minKey = now - tsTickPerMin[pCfg->precision] * pCfg->keep0; + } + } for (int32_t i = 0; i < size; ++i) { SSubmitTbData *pData = TARRAY_GET_ELEM(pMsg->aSubmitTbData, i); From e7411183d9b28ab2143679442d1a455f9b220231 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 17:08:24 +0800 Subject: [PATCH 06/15] enh: create SDB_DB entry in prepareAction --- source/dnode/mnode/impl/src/mndDb.c | 13 +++++- source/dnode/mnode/impl/src/mndSync.c | 63 ++++++++++++++++++++------- source/dnode/mnode/sdb/src/sdbHash.c | 2 + 3 files changed, 61 insertions(+), 17 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index e9f04dac52..6517ab826b 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -448,6 +448,16 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; } +static int32_t mndSetCreateDbPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { + SSdbRaw *pDbRaw = mndDbActionEncode(pDb); + if (pDbRaw == NULL) return -1; + + STransAction action = {.pRaw = pDbRaw, .msgType = TDMT_MND_CREATE_DB}; + if (mndTransAppendPrepareAction(pTrans, &action) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; + return 0; +} + static int32_t mndSetNewVgPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroups) { for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { if (mndAddNewVgPrepareAction(pMnode, pTrans, (pVgroups + v)) != 0) return -1; @@ -459,7 +469,7 @@ static int32_t mndSetCreateDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pD SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; if (mndTransAppendRedolog(pTrans, pDbRaw) != 0) return -1; - if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; + if (sdbSetRawStatus(pDbRaw, SDB_STATUS_UPDATE) != 0) return -1; for (int32_t v = 0; v < pDb->cfg.numOfVgroups; ++v) { SSdbRaw *pVgRaw = mndVgroupActionEncode(pVgroups + v); @@ -633,6 +643,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; mndTransSetOper(pTrans, MND_OPER_CREATE_DB); + if (mndSetCreateDbPrepareActions(pMnode, pTrans, &dbObj) != 0) goto _OVER; if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 68bfe09b5e..ad91a634ae 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -16,6 +16,7 @@ #define _DEFAULT_SOURCE #include "mndSync.h" #include "mndCluster.h" +#include "mndDb.h" #include "mndTrans.h" #include "mndVgroup.h" @@ -74,23 +75,17 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - SSdbRow *pRow = NULL; - int32_t code = -1; +static int32_t mndValidateNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int code = -1; + SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_VGROUP])(pAction->pRaw); + if (pRow == NULL) goto _OUT; + SVgObj *pVgroup = sdbGetRowObj(pRow); + if (pVgroup == NULL) goto _OUT; - if (pAction->msgType == TDMT_MND_CREATE_VG) { - pRow = mndVgroupActionDecode(pAction->pRaw); - if (pRow == NULL) goto _OUT; - - SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) goto _OUT; - - int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); - if (maxVgId > pVgroup->vgId) { - mError("trans:%d, failed to satisfy vgroup id %d of prepare action. maxVgId:%d", pTrans->id, pVgroup->vgId, - maxVgId); - goto _OUT; - } + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (maxVgId > pVgroup->vgId) { + mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); + goto _OUT; } code = 0; @@ -99,6 +94,42 @@ _OUT: return code; } +static int32_t mndValidateCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int code = -1; + SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_DB])(pAction->pRaw); + if (pRow == NULL) goto _OUT; + SDbObj *pNewDb = sdbGetRowObj(pRow); + if (pNewDb == NULL) goto _OUT; + + SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); + if (pOldDb != NULL) { + mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); + sdbRelease(pMnode->pSdb, pOldDb); + goto _OUT; + } + + code = 0; +_OUT: + taosMemoryFreeClear(pRow); + return code; +} + +static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { + int32_t code = 0; + + switch (pAction->pRaw->type) { + case SDB_VGROUP: + code = mndValidateNewVgPrepareAction(pMnode, pTrans, pAction); + break; + case SDB_DB: + code = mndValidateCreateDbPrepareAction(pMnode, pTrans, pAction); + break; + default: + } + + return code; +} + static int32_t mndTransValidatePrepareStage(SMnode *pMnode, STrans *pTrans) { int32_t code = -1; int32_t action = 0; diff --git a/source/dnode/mnode/sdb/src/sdbHash.c b/source/dnode/mnode/sdb/src/sdbHash.c index 258b22d8ee..09743d549a 100644 --- a/source/dnode/mnode/sdb/src/sdbHash.c +++ b/source/dnode/mnode/sdb/src/sdbHash.c @@ -79,6 +79,8 @@ const char *sdbStatusName(ESdbStatus status) { return "dropped"; case SDB_STATUS_INIT: return "init"; + case SDB_STATUS_UPDATE: + return "update"; default: return "undefine"; } From c4a3a5da35a833105eb7d7131a7073eee2436800 Mon Sep 17 00:00:00 2001 From: Minglei Jin Date: Fri, 18 Aug 2023 18:02:53 +0800 Subject: [PATCH 07/15] vnode: fix write to s3 last tier --- source/dnode/vnode/src/vnd/vnodeSvr.c | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index f75c779f4b..17f34b0a34 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -16,6 +16,7 @@ #include "tencode.h" #include "tmsg.h" #include "vnd.h" +#include "vndCos.h" #include "vnode.h" #include "vnodeInt.h" @@ -190,7 +191,18 @@ static int32_t vnodePreProcessSubmitTbData(SVnode *pVnode, SDecoder *pCoder, int } else if (pVnode->config.tsdbCfg.precision == TSDB_TIME_PRECISION_NANO) { now *= 1000000; } - TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * pVnode->config.tsdbCfg.keep2; + + int32_t nlevel = tfsGetLevel(pVnode->pTfs); + int32_t keep = pVnode->config.tsdbCfg.keep2; + if (nlevel > 1 && tsS3Enabled) { + if (nlevel == 3) { + keep = pVnode->config.tsdbCfg.keep1; + } else if (nlevel == 2) { + keep = pVnode->config.tsdbCfg.keep0; + } + } + + TSKEY minKey = now - tsTickPerMin[pVnode->config.tsdbCfg.precision] * keep; TSKEY maxKey = tsMaxKeyByPrecision[pVnode->config.tsdbCfg.precision]; if (submitTbData.flags & SUBMIT_REQ_COLUMN_DATA_FORMAT) { uint64_t nColData; From 9f1c8cc7b88e6baed8fe9e407c8df254e128857b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 18:21:06 +0800 Subject: [PATCH 08/15] enh: unify validation of prepare actions with cb validateFp --- source/dnode/mnode/impl/src/mndDb.c | 16 ++++++ source/dnode/mnode/impl/src/mndSync.c | 68 +++++++------------------ source/dnode/mnode/impl/src/mndVgroup.c | 13 +++++ source/dnode/mnode/sdb/inc/sdb.h | 3 ++ source/dnode/mnode/sdb/src/sdb.c | 1 + 5 files changed, 51 insertions(+), 50 deletions(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 6517ab826b..73a4e42abf 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -37,6 +37,8 @@ static SSdbRow *mndDbActionDecode(SSdbRaw *pRaw); static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionDelete(SSdb *pSdb, SDbObj *pDb); static int32_t mndDbActionUpdate(SSdb *pSdb, SDbObj *pOld, SDbObj *pNew); +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); + static int32_t mndProcessCreateDbReq(SRpcMsg *pReq); static int32_t mndProcessAlterDbReq(SRpcMsg *pReq); static int32_t mndProcessDropDbReq(SRpcMsg *pReq); @@ -59,6 +61,7 @@ int32_t mndInitDb(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndDbActionInsert, .updateFp = (SdbUpdateFp)mndDbActionUpdate, .deleteFp = (SdbDeleteFp)mndDbActionDelete, + .validateFp = (SdbValidateFp)mndNewDbActionValidate, }; mndSetMsgHandle(pMnode, TDMT_MND_CREATE_DB, mndProcessCreateDbReq); @@ -247,6 +250,19 @@ _OVER: return pRow; } +static int32_t mndNewDbActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SDbObj *pNewDb = pObj; + + SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); + if (pOldDb != NULL) { + mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); + sdbRelease(pMnode->pSdb, pOldDb); + return -1; + } + + return 0; +} + static int32_t mndDbActionInsert(SSdb *pSdb, SDbObj *pDb) { mTrace("db:%s, perform insert action, row:%p", pDb->name, pDb); return 0; diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index ad91a634ae..3d8fd6220f 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -16,9 +16,7 @@ #define _DEFAULT_SOURCE #include "mndSync.h" #include "mndCluster.h" -#include "mndDb.h" #include "mndTrans.h" -#include "mndVgroup.h" static int32_t mndSyncEqCtrlMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) { if (pMsg == NULL || pMsg->pCont == NULL) { @@ -75,58 +73,28 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -static int32_t mndValidateNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int code = -1; - SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_VGROUP])(pAction->pRaw); - if (pRow == NULL) goto _OUT; - SVgObj *pVgroup = sdbGetRowObj(pRow); - if (pVgroup == NULL) goto _OUT; - - int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); - if (maxVgId > pVgroup->vgId) { - mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); - goto _OUT; - } - - code = 0; -_OUT: - taosMemoryFreeClear(pRow); - return code; -} - -static int32_t mndValidateCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int code = -1; - SSdbRow *pRow = (pMnode->pSdb->decodeFps[SDB_DB])(pAction->pRaw); - if (pRow == NULL) goto _OUT; - SDbObj *pNewDb = sdbGetRowObj(pRow); - if (pNewDb == NULL) goto _OUT; - - SDbObj *pOldDb = sdbAcquire(pMnode->pSdb, SDB_DB, pNewDb->name); - if (pOldDb != NULL) { - mError("trans:%d, db name already in use. name: %s", pTrans->id, pNewDb->name); - sdbRelease(pMnode->pSdb, pOldDb); - goto _OUT; - } - - code = 0; -_OUT: - taosMemoryFreeClear(pRow); - return code; -} - static int32_t mndTransValidatePrepareAction(SMnode *pMnode, STrans *pTrans, STransAction *pAction) { - int32_t code = 0; + SSdbRaw *pRaw = pAction->pRaw; + SSdb *pSdb = pMnode->pSdb; + SSdbRow *pRow = NULL; + void *pObj = NULL; + int code = -1; - switch (pAction->pRaw->type) { - case SDB_VGROUP: - code = mndValidateNewVgPrepareAction(pMnode, pTrans, pAction); - break; - case SDB_DB: - code = mndValidateCreateDbPrepareAction(pMnode, pTrans, pAction); - break; - default: + if (pRaw->status != SDB_STATUS_CREATING) goto _OUT; + + pRow = (pSdb->decodeFps[pRaw->type])(pRaw); + if (pRow == NULL) goto _OUT; + pObj = sdbGetRowObj(pRow); + if (pObj == NULL) goto _OUT; + + SdbValidateFp validateFp = pSdb->validateFps[pRaw->type]; + code = 0; + if (validateFp) { + code = validateFp(pMnode, pTrans, pObj); } +_OUT: + taosMemoryFreeClear(pRow); return code; } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 99adaf8f67..0bf524508a 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -33,6 +33,7 @@ static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionDelete(SSdb *pSdb, SVgObj *pVgroup); static int32_t mndVgroupActionUpdate(SSdb *pSdb, SVgObj *pOld, SVgObj *pNew); +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj); static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static void mndCancelGetNextVgroup(SMnode *pMnode, void *pIter); @@ -53,6 +54,7 @@ int32_t mndInitVgroup(SMnode *pMnode) { .insertFp = (SdbInsertFp)mndVgroupActionInsert, .updateFp = (SdbUpdateFp)mndVgroupActionUpdate, .deleteFp = (SdbDeleteFp)mndVgroupActionDelete, + .validateFp = (SdbValidateFp)mndNewVgActionValidate, }; mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndTransProcessRsp); @@ -171,6 +173,17 @@ _OVER: return pRow; } +static int32_t mndNewVgActionValidate(SMnode *pMnode, STrans *pTrans, void *pObj) { + SVgObj *pVgroup = pObj; + + int32_t maxVgId = sdbGetMaxId(pMnode->pSdb, SDB_VGROUP); + if (maxVgId > pVgroup->vgId) { + mError("trans:%d, vgroup id %d already in use. maxVgId:%d", pTrans->id, pVgroup->vgId, maxVgId); + return -1; + } + return 0; +} + static int32_t mndVgroupActionInsert(SSdb *pSdb, SVgObj *pVgroup) { mTrace("vgId:%d, perform insert action, row:%p", pVgroup->vgId, pVgroup); return 0; diff --git a/source/dnode/mnode/sdb/inc/sdb.h b/source/dnode/mnode/sdb/inc/sdb.h index 3c96d8a2fd..695373d220 100644 --- a/source/dnode/mnode/sdb/inc/sdb.h +++ b/source/dnode/mnode/sdb/inc/sdb.h @@ -106,6 +106,7 @@ typedef int32_t (*SdbInsertFp)(SSdb *pSdb, void *pObj); typedef int32_t (*SdbUpdateFp)(SSdb *pSdb, void *pSrcObj, void *pDstObj); typedef int32_t (*SdbDeleteFp)(SSdb *pSdb, void *pObj, bool callFunc); typedef int32_t (*SdbDeployFp)(SMnode *pMnode); +typedef int32_t (*SdbValidateFp)(SMnode *pMnode, void *pTrans, void *pObj); typedef SSdbRow *(*SdbDecodeFp)(SSdbRaw *pRaw); typedef SSdbRaw *(*SdbEncodeFp)(void *pObj); typedef bool (*sdbTraverseFp)(SMnode *pMnode, void *pObj, void *p1, void *p2, void *p3); @@ -189,6 +190,7 @@ typedef struct SSdb { SdbDeployFp deployFps[SDB_MAX]; SdbEncodeFp encodeFps[SDB_MAX]; SdbDecodeFp decodeFps[SDB_MAX]; + SdbValidateFp validateFps[SDB_MAX]; TdThreadMutex filelock; } SSdb; @@ -207,6 +209,7 @@ typedef struct { SdbInsertFp insertFp; SdbUpdateFp updateFp; SdbDeleteFp deleteFp; + SdbValidateFp validateFp; } SSdbTable; typedef struct SSdbOpt { diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 9797dd8337..c4b32fe87c 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -121,6 +121,7 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { pSdb->deployFps[sdbType] = table.deployFp; pSdb->encodeFps[sdbType] = table.encodeFp; pSdb->decodeFps[sdbType] = table.decodeFp; + pSdb->validateFps[sdbType] = table.validateFp; int32_t hashType = 0; if (keyType == SDB_KEY_INT32) { From 0b4c753b159ac0cf083a3f78237776aef277dbc8 Mon Sep 17 00:00:00 2001 From: haoranchen Date: Fri, 18 Aug 2023 18:33:06 +0800 Subject: [PATCH 09/15] Update test.py --- tests/system-test/test.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 7082bb0f22..b22850bc71 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -199,7 +199,7 @@ if __name__ == "__main__": createDnodeNums = value if key in ['-i', '--independentMnode']: - independentMnode = value + independentMnode = False if key in ['-R', '--restful']: restful = True From f30d1ebacdcc1d70706ead27a4d19010d03afebf Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Fri, 18 Aug 2023 18:35:44 +0800 Subject: [PATCH 10/15] enh: refactor func name as mndTransAppendPrepareLog --- source/dnode/mnode/impl/inc/mndTrans.h | 4 +++- source/dnode/mnode/impl/src/mndDb.c | 7 +++---- source/dnode/mnode/impl/src/mndTrans.c | 9 ++++----- source/dnode/mnode/impl/src/mndVgroup.c | 3 +-- 4 files changed, 11 insertions(+), 12 deletions(-) diff --git a/source/dnode/mnode/impl/inc/mndTrans.h b/source/dnode/mnode/impl/inc/mndTrans.h index 625546aa55..04544da80e 100644 --- a/source/dnode/mnode/impl/inc/mndTrans.h +++ b/source/dnode/mnode/impl/inc/mndTrans.h @@ -66,11 +66,13 @@ void mndReleaseTrans(SMnode *pMnode, STrans *pTrans); STrans *mndTransCreate(SMnode *pMnode, ETrnPolicy policy, ETrnConflct conflict, const SRpcMsg *pReq, const char *opername); void mndTransDrop(STrans *pTrans); + +int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendRedolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendUndolog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw); int32_t mndTransAppendNullLog(STrans *pTrans); -int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction); + int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction); int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction); void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 73a4e42abf..56cea10b32 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -464,12 +464,11 @@ static void mndSetDefaultDbCfg(SDbCfg *pCfg) { if (pCfg->tsdbPageSize <= 0) pCfg->tsdbPageSize = TSDB_DEFAULT_TSDB_PAGESIZE; } -static int32_t mndSetCreateDbPrepareActions(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { +static int32_t mndSetCreateDbPrepareAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { SSdbRaw *pDbRaw = mndDbActionEncode(pDb); if (pDbRaw == NULL) return -1; - STransAction action = {.pRaw = pDbRaw, .msgType = TDMT_MND_CREATE_DB}; - if (mndTransAppendPrepareAction(pTrans, &action) != 0) return -1; + if (mndTransAppendPrepareLog(pTrans, pDbRaw) != 0) return -1; if (sdbSetRawStatus(pDbRaw, SDB_STATUS_CREATING) != 0) return -1; return 0; } @@ -659,7 +658,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER; mndTransSetOper(pTrans, MND_OPER_CREATE_DB); - if (mndSetCreateDbPrepareActions(pMnode, pTrans, &dbObj) != 0) goto _OVER; + if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER; if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 7ebaf6dda5..02d9368595 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -655,11 +655,10 @@ int32_t mndTransAppendCommitlog(STrans *pTrans, SSdbRaw *pRaw) { return mndTransAppendAction(pTrans->commitActions, &action); } -int32_t mndTransAppendPrepareAction(STrans *pTrans, STransAction *pAction) { - pAction->stage = TRN_STAGE_PREPARE; - pAction->actionType = TRANS_ACTION_RAW; - pAction->mTraceId = pTrans->mTraceId; - return mndTransAppendAction(pTrans->prepareActions, pAction); +int32_t mndTransAppendPrepareLog(STrans *pTrans, SSdbRaw *pRaw) { + STransAction action = { + .pRaw = pRaw, .stage = TRN_STAGE_PREPARE, .actionType = TRANS_ACTION_RAW, .mTraceId = pTrans->mTraceId}; + return mndTransAppendAction(pTrans->prepareActions, &action); } int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction) { diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 0bf524508a..e0156db67c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1276,8 +1276,7 @@ int32_t mndAddNewVgPrepareAction(SMnode *pMnode, STrans *pTrans, SVgObj *pVg) { SSdbRaw *pRaw = mndVgroupActionEncode(pVg); if (pRaw == NULL) goto _err; - STransAction action = {.pRaw = pRaw, .msgType = TDMT_MND_CREATE_VG}; - if (mndTransAppendPrepareAction(pTrans, &action) != 0) goto _err; + if (mndTransAppendPrepareLog(pTrans, pRaw) != 0) goto _err; (void)sdbSetRawStatus(pRaw, SDB_STATUS_CREATING); pRaw = NULL; return 0; From d3ce0c1f80472bba65f1705ae2845a0aee9dcf4b Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Mon, 21 Aug 2023 11:42:57 +0800 Subject: [PATCH 11/15] fix: ensure creating status of create-db before completion --- source/dnode/mnode/impl/src/mndDb.c | 1 - 1 file changed, 1 deletion(-) diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 56cea10b32..20b342f9e3 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -660,7 +660,6 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate, mndTransSetOper(pTrans, MND_OPER_CREATE_DB); if (mndSetCreateDbPrepareAction(pMnode, pTrans, &dbObj) != 0) goto _OVER; if (mndSetNewVgPrepareActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; - if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups, pNewUserDuped) != 0) goto _OVER; if (mndSetCreateDbRedoActions(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER; From 48b23f735cf9b095bbeb8357e5c8a8c1f6e25480 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 21 Aug 2023 15:28:29 +0800 Subject: [PATCH 12/15] test:kill tmq_sim process when start cases in windows --- tests/pytest/util/dnodes.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/tests/pytest/util/dnodes.py b/tests/pytest/util/dnodes.py index 89e3df81b9..8633fc660f 100644 --- a/tests/pytest/util/dnodes.py +++ b/tests/pytest/util/dnodes.py @@ -812,6 +812,15 @@ class TDDnodes: time.sleep(1) processID = subprocess.check_output( psCmd, shell=True).decode("utf-8").strip() + psCmd = "for /f %a in ('wmic process where \"name='tmq_sim'\" get processId ^| xargs echo ^| awk '{print $2}' ^&^& echo aa') do @(ps | grep %a | awk '{print $1}' | xargs)" + processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() + while(processID): + print(processID) + killCmd = "kill -9 %s > nul 2>&1" % processID + os.system(killCmd) + time.sleep(1) + processID = subprocess.check_output( + psCmd, shell=True).decode("utf-8").strip() else: psCmd = "ps -ef | grep -w taosd | grep 'root' | grep -v grep| grep -v defunct | awk '{print $2}' | xargs" processID = subprocess.check_output(psCmd, shell=True).decode("utf-8").strip() From af9a73d1e35d81e0324d93a59fe8d213b1332d25 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 21 Aug 2023 15:39:06 +0800 Subject: [PATCH 13/15] test:repire test case for windows --- tests/parallel_test/cases.task | 2 +- tests/parallel_test/split_case.sh | 2 ++ tests/script/win-test-file | 49 +++---------------------------- tests/system-test/win-test-file | 4 +++ 4 files changed, 11 insertions(+), 46 deletions(-) diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 94b00c7b2a..78cdb63b64 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -455,7 +455,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 -n 3 ,,n,system-test,python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 -,,n,system-test,python ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 +,,n,system-test,python3 ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 ,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 #,,y,system-test,./pytest.sh python3 ./test.py -f 6-cluster/5dnode3mnodeDrop.py -N 5 diff --git a/tests/parallel_test/split_case.sh b/tests/parallel_test/split_case.sh index 4e2c535faf..e237cbf984 100755 --- a/tests/parallel_test/split_case.sh +++ b/tests/parallel_test/split_case.sh @@ -5,8 +5,10 @@ parm_path=$(pwd ${parm_path}) echo "execute path:${parm_path}" cd ${parm_path} cp cases.task ${case_file} +# comment udf and stream case in windows sed -i '/udf/d' ${case_file} sed -i '/Udf/d' ${case_file} +sed -i '/stream/d' ${case_file} sed -i '/^$/d' ${case_file} sed -i '$a\%%FINISHED%%' ${case_file} diff --git a/tests/script/win-test-file b/tests/script/win-test-file index dc3093e0ea..4d578a93cd 100644 --- a/tests/script/win-test-file +++ b/tests/script/win-test-file @@ -26,8 +26,10 @@ ./test.sh -f tsim/user/basic.sim ./test.sh -f tsim/user/password.sim ./test.sh -f tsim/user/privilege_db.sim +./test.sh -f tsim/user/privilege_sysinfo.sim ./test.sh -f tsim/user/privilege_topic.sim ./test.sh -f tsim/user/privilege_table.sim +./test.sh -f tsim/user/privilege_create_db.sim ./test.sh -f tsim/db/alter_option.sim ./test.sh -f tsim/db/alter_replica_31.sim ./test.sh -f tsim/db/basic1.sim @@ -183,6 +185,7 @@ ./test.sh -f tsim/query/scalarNull.sim ./test.sh -f tsim/query/session.sim ./test.sh -f tsim/query/join_interval.sim +./test.sh -f tsim/query/join_pk.sim ./test.sh -f tsim/query/unionall_as_table.sim ./test.sh -f tsim/query/multi_order_by.sim ./test.sh -f tsim/query/sys_tbname.sim @@ -197,6 +200,7 @@ ./test.sh -f tsim/query/tag_scan.sim ./test.sh -f tsim/query/nullColSma.sim ./test.sh -f tsim/query/bug3398.sim +./test.sh -f tsim/query/explain_tsorder.sim ./test.sh -f tsim/qnode/basic1.sim ./test.sh -f tsim/snode/basic1.sim ./test.sh -f tsim/mnode/basic1.sim @@ -233,51 +237,6 @@ ./test.sh -f tsim/table/table.sim ./test.sh -f tsim/table/tinyint.sim ./test.sh -f tsim/table/vgroup.sim -./test.sh -f tsim/stream/basic0.sim -g -./test.sh -f tsim/stream/basic1.sim -./test.sh -f tsim/stream/basic2.sim -./test.sh -f tsim/stream/basic3.sim -./test.sh -f tsim/stream/basic4.sim -./test.sh -f tsim/stream/checkStreamSTable1.sim -./test.sh -f tsim/stream/checkStreamSTable.sim -./test.sh -f tsim/stream/deleteInterval.sim -./test.sh -f tsim/stream/deleteSession.sim -./test.sh -f tsim/stream/deleteState.sim -./test.sh -f tsim/stream/distributeInterval0.sim -./test.sh -f tsim/stream/distributeIntervalRetrive0.sim -./test.sh -f tsim/stream/distributeSession0.sim -./test.sh -f tsim/stream/drop_stream.sim -./test.sh -f tsim/stream/fillHistoryBasic1.sim -./test.sh -f tsim/stream/fillHistoryBasic2.sim -./test.sh -f tsim/stream/fillHistoryBasic3.sim -./test.sh -f tsim/stream/fillIntervalDelete0.sim -./test.sh -f tsim/stream/fillIntervalDelete1.sim -./test.sh -f tsim/stream/fillIntervalLinear.sim -./test.sh -f tsim/stream/fillIntervalPartitionBy.sim -./test.sh -f tsim/stream/fillIntervalPrevNext1.sim -./test.sh -f tsim/stream/fillIntervalPrevNext.sim -./test.sh -f tsim/stream/fillIntervalRange.sim -./test.sh -f tsim/stream/fillIntervalValue.sim -./test.sh -f tsim/stream/ignoreCheckUpdate.sim -./test.sh -f tsim/stream/ignoreExpiredData.sim -./test.sh -f tsim/stream/partitionby1.sim -./test.sh -f tsim/stream/partitionbyColumnInterval.sim -./test.sh -f tsim/stream/partitionbyColumnSession.sim -./test.sh -f tsim/stream/partitionbyColumnState.sim -./test.sh -f tsim/stream/partitionby.sim -./test.sh -f tsim/stream/pauseAndResume.sim -./test.sh -f tsim/stream/schedSnode.sim -./test.sh -f tsim/stream/session0.sim -./test.sh -f tsim/stream/session1.sim -./test.sh -f tsim/stream/sliding.sim -./test.sh -f tsim/stream/state0.sim -./test.sh -f tsim/stream/state1.sim -./test.sh -f tsim/stream/triggerInterval0.sim -./test.sh -f tsim/stream/triggerSession0.sim -./test.sh -f tsim/stream/udTableAndTag0.sim -./test.sh -f tsim/stream/udTableAndTag1.sim -./test.sh -f tsim/stream/udTableAndTag2.sim -./test.sh -f tsim/stream/windowClose.sim ./test.sh -f tsim/trans/lossdata1.sim ./test.sh -f tsim/tmq/basic1.sim ./test.sh -f tsim/tmq/basic2.sim diff --git a/tests/system-test/win-test-file b/tests/system-test/win-test-file index 0f644666cb..adea684ef0 100644 --- a/tests/system-test/win-test-file +++ b/tests/system-test/win-test-file @@ -17,6 +17,7 @@ python3 ./test.py -f 2-query/nestedQuery_str.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_math.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_time.py -Q 4 python3 ./test.py -f 2-query/nestedQuery_26.py -Q 4 +python3 ./test.py -f 2-query/interval_limit_opt.py -Q 4 python3 ./test.py -f 7-tmq/tmqShow.py python3 ./test.py -f 7-tmq/tmqDropStb.py python3 ./test.py -f 7-tmq/subscribeStb0.py @@ -133,6 +134,8 @@ python3 ./test.py -f 0-others/sysinfo.py python3 ./test.py -f 0-others/user_control.py python3 ./test.py -f 0-others/user_manage.py python3 ./test.py -f 0-others/user_privilege.py +python3 ./test.py -f 0-others/user_privilege_show.py +python3 ./test.py -f 0-others/user_privilege_all.py python3 ./test.py -f 0-others/fsync.py python3 ./test.py -f 0-others/multilevel.py python3 ./test.py -f 0-others/compatibility.py @@ -421,6 +424,7 @@ python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertData.py -N 6 -M 3 -n 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRestartDnodeInsertDataAsync.py -N 6 -M 3 python3 ./test.py -f 6-cluster/manually-test/6dnode3mnodeInsertLessDataAlterRep3to1to3.py -N 6 -M 3 +python3 ./test.py -f 6-cluster/5dnode3mnodeRoll.py -N 3 -C 1 python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 python3 ./test.py -f 6-cluster/5dnode3mnodeAdd1Ddnoe.py -N 7 -M 3 -C 6 -n 3 python3 ./test.py -f 6-cluster/5dnode3mnodeRecreateMnode.py -N 6 -M 3 From b1e91b554f2de82fef3a573e4892da40493853b8 Mon Sep 17 00:00:00 2001 From: chenhaoran Date: Mon, 21 Aug 2023 15:40:36 +0800 Subject: [PATCH 14/15] test:repire test case for windows --- tests/system-test/test.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/system-test/test.py b/tests/system-test/test.py index 7082bb0f22..1c50e5bbbe 100644 --- a/tests/system-test/test.py +++ b/tests/system-test/test.py @@ -199,7 +199,7 @@ if __name__ == "__main__": createDnodeNums = value if key in ['-i', '--independentMnode']: - independentMnode = value + independentMnode = False if key in ['-R', '--restful']: restful = True @@ -553,6 +553,7 @@ if __name__ == "__main__": else : # dnode > 1 cluster tdLog.debug("create an cluster with %s nodes and make %s dnode as independent mnode"%(dnodeNums,mnodeNums)) + print(independentMnode,"independentMnode valuse") dnodeslist = cluster.configure_cluster(dnodeNums=dnodeNums, mnodeNums=mnodeNums, independentMnode=independentMnode) tdDnodes = ClusterDnodes(dnodeslist) tdDnodes.init(deployPath, masterIp) From b8d62a86ed5d2f74d30915a6b19c9669d9a808b9 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Mon, 21 Aug 2023 16:29:48 +0800 Subject: [PATCH 15/15] feat: clean useless file --- source/dnode/vnode/src/tsdb/tsdbFS2.c | 210 +++++++++++++++++++++++- source/dnode/vnode/src/tsdb/tsdbFile2.c | 2 +- 2 files changed, 203 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbFS2.c b/source/dnode/vnode/src/tsdb/tsdbFS2.c index afa294d3b0..7f843070d6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS2.c @@ -17,12 +17,24 @@ #include "tsdbUpgrade.h" #include "vnd.h" -extern int vnodeScheduleTask(int (*execute)(void *), void *arg); -extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); +extern int vnodeScheduleTask(int (*execute)(void *), void *arg); +extern int vnodeScheduleTaskEx(int tpid, int (*execute)(void *), void *arg); +extern void remove_file(const char *fname); #define TSDB_FS_EDIT_MIN TSDB_FEDIT_COMMIT #define TSDB_FS_EDIT_MAX (TSDB_FEDIT_MERGE + 1) +typedef struct STFileHashEntry { + struct STFileHashEntry *next; + char fname[TSDB_FILENAME_LEN]; +} STFileHashEntry; + +typedef struct { + int32_t numFile; + int32_t numBucket; + STFileHashEntry **buckets; +} STFileHash; + enum { TSDB_FS_STATE_NONE = 0, TSDB_FS_STATE_OPEN, @@ -315,10 +327,8 @@ _exit: } // static int32_t -static int32_t apply_abort(STFileSystem *fs) { - // TODO - return 0; -} +static int32_t tsdbFSDoSanAndFix(STFileSystem *fs); +static int32_t apply_abort(STFileSystem *fs) { return tsdbFSDoSanAndFix(fs); } static int32_t abort_edit(STFileSystem *fs) { char fname[TSDB_FILENAME_LEN]; @@ -349,6 +359,180 @@ _exit: return code; } +static int32_t tsdbFSDoScanAndFixFile(STFileSystem *fs, const STFileObj *fobj) { + int32_t code = 0; + int32_t lino = 0; + + // check file existence + if (!taosCheckExistFile(fobj->fname)) { + code = TSDB_CODE_FILE_CORRUPTED; + tsdbError("vgId:%d %s failed since file:%s does not exist", TD_VID(fs->tsdb->pVnode), __func__, fobj->fname); + return code; + } + + { // TODO: check file size + // int64_t fsize; + // if (taosStatFile(fobj->fname, &fsize, NULL, NULL) < 0) { + // code = TAOS_SYSTEM_ERROR(terrno); + // tsdbError("vgId:%d %s failed since file:%s stat failed, reason:%s", TD_VID(fs->tsdb->pVnode), __func__, + // fobj->fname, tstrerror(code)); + // return code; + // } + } + + return 0; +} + +static void tsdbFSDestroyFileObjHash(STFileHash *hash); + +static int32_t tsdbFSAddEntryToFileObjHash(STFileHash *hash, const char *fname) { + STFileHashEntry *entry = taosMemoryMalloc(sizeof(*entry)); + if (entry == NULL) return TSDB_CODE_OUT_OF_MEMORY; + + strcpy(entry->fname, fname); + + uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket; + + entry->next = hash->buckets[idx]; + hash->buckets[idx] = entry; + hash->numFile++; + + return 0; +} + +static int32_t tsdbFSCreateFileObjHash(STFileSystem *fs, STFileHash *hash) { + int32_t code = 0; + char fname[TSDB_FILENAME_LEN]; + + // init hash table + hash->numFile = 0; + hash->numBucket = 4096; + hash->buckets = taosMemoryCalloc(hash->numBucket, sizeof(STFileHashEntry *)); + if (hash->buckets == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + return code; + } + + // vnode.json + current_fname(fs->tsdb, fname, TSDB_FCURRENT); + code = tsdbFSAddEntryToFileObjHash(hash, fname); + if (code) goto _exit; + + // other + STFileSet *fset = NULL; + TARRAY2_FOREACH(fs->fSetArr, fset) { + // data file + for (int32_t i = 0; i < TSDB_FTYPE_MAX; i++) { + if (fset->farr[i] != NULL) { + code = tsdbFSAddEntryToFileObjHash(hash, fset->farr[i]->fname); + if (code) goto _exit; + } + } + + // stt file + SSttLvl *lvl = NULL; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj *fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + code = tsdbFSAddEntryToFileObjHash(hash, fobj->fname); + if (code) goto _exit; + } + } + } + +_exit: + if (code) { + tsdbFSDestroyFileObjHash(hash); + } + return code; +} + +static const STFileHashEntry *tsdbFSGetFileObjHashEntry(STFileHash *hash, const char *fname) { + uint32_t idx = MurmurHash3_32(fname, strlen(fname)) % hash->numBucket; + + STFileHashEntry *entry = hash->buckets[idx]; + while (entry) { + if (strcmp(entry->fname, fname) == 0) { + return entry; + } + entry = entry->next; + } + + return NULL; +} + +static void tsdbFSDestroyFileObjHash(STFileHash *hash) { + for (int32_t i = 0; i < hash->numBucket; i++) { + STFileHashEntry *entry = hash->buckets[i]; + while (entry) { + STFileHashEntry *next = entry->next; + taosMemoryFree(entry); + entry = next; + } + } + taosMemoryFree(hash->buckets); + memset(hash, 0, sizeof(*hash)); +} + +static int32_t tsdbFSDoSanAndFix(STFileSystem *fs) { + int32_t code = 0; + int32_t lino = 0; + + { // scan each file + STFileSet *fset = NULL; + TARRAY2_FOREACH(fs->fSetArr, fset) { + // data file + for (int32_t ftype = 0; ftype < TSDB_FTYPE_MAX; ftype++) { + if (fset->farr[ftype] == NULL) continue; + code = tsdbFSDoScanAndFixFile(fs, fset->farr[ftype]); + TSDB_CHECK_CODE(code, lino, _exit); + } + + // stt file + SSttLvl *lvl; + TARRAY2_FOREACH(fset->lvlArr, lvl) { + STFileObj *fobj; + TARRAY2_FOREACH(lvl->fobjArr, fobj) { + code = tsdbFSDoScanAndFixFile(fs, fobj); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + } + } + + { // clear unreferenced files + STfsDir *dir = tfsOpendir(fs->tsdb->pVnode->pTfs, fs->tsdb->path); + if (dir == NULL) { + code = TAOS_SYSTEM_ERROR(terrno); + lino = __LINE__; + goto _exit; + } + + STFileHash fobjHash = {0}; + code = tsdbFSCreateFileObjHash(fs, &fobjHash); + if (code) goto _close_dir; + + for (const STfsFile *file = NULL; (file = tfsReaddir(dir)) != NULL;) { + if (taosIsDir(file->aname)) continue; + + if (tsdbFSGetFileObjHashEntry(&fobjHash, file->aname) == NULL) { + remove_file(file->aname); + } + } + + tsdbFSDestroyFileObjHash(&fobjHash); + + _close_dir: + tfsClosedir(dir); + } + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code); + } + return code; +} + static int32_t tsdbFSScanAndFix(STFileSystem *fs) { fs->neid = 0; @@ -356,8 +540,18 @@ static int32_t tsdbFSScanAndFix(STFileSystem *fs) { const STFileSet *fset; TARRAY2_FOREACH(fs->fSetArr, fset) { fs->neid = TMAX(fs->neid, tsdbTFileSetMaxCid(fset)); } - // TODO - return 0; + // scan and fix + int32_t code = 0; + int32_t lino = 0; + + code = tsdbFSDoSanAndFix(fs); + TSDB_CHECK_CODE(code, lino, _exit); + +_exit: + if (code) { + TSDB_ERROR_LOG(TD_VID(fs->tsdb->pVnode), lino, code); + } + return code; } static int32_t tsdbFSDupState(STFileSystem *fs) { diff --git a/source/dnode/vnode/src/tsdb/tsdbFile2.c b/source/dnode/vnode/src/tsdb/tsdbFile2.c index be021169cd..585316469a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile2.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile2.c @@ -41,7 +41,7 @@ static const struct { [TSDB_FTYPE_STT] = {"stt", stt_to_json, stt_from_json}, }; -static void remove_file(const char *fname) { +void remove_file(const char *fname) { taosRemoveFile(fname); tsdbInfo("file:%s is removed", fname); }