From 5d86becdbff1e490c8a7b122ac5a7e25ec75753c Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 7 Jan 2022 18:55:12 -0800 Subject: [PATCH 1/7] rename some files --- source/dnode/mgmt/impl/test/CMakeLists.txt | 5 +---- source/dnode/mgmt/impl/test/{vgroup => vnode}/CMakeLists.txt | 0 .../mgmt/impl/test/{vgroup/vgroup.cpp => vnode/vnode.cpp} | 0 source/dnode/mnode/impl/test/CMakeLists.txt | 2 ++ source/dnode/{mgmt => mnode}/impl/test/db/CMakeLists.txt | 0 source/dnode/{mgmt => mnode}/impl/test/db/db.cpp | 0 source/dnode/{mgmt => mnode}/impl/test/stb/CMakeLists.txt | 0 source/dnode/{mgmt => mnode}/impl/test/stb/stb.cpp | 0 8 files changed, 3 insertions(+), 4 deletions(-) rename source/dnode/mgmt/impl/test/{vgroup => vnode}/CMakeLists.txt (100%) rename source/dnode/mgmt/impl/test/{vgroup/vgroup.cpp => vnode/vnode.cpp} (100%) rename source/dnode/{mgmt => mnode}/impl/test/db/CMakeLists.txt (100%) rename source/dnode/{mgmt => mnode}/impl/test/db/db.cpp (100%) rename source/dnode/{mgmt => mnode}/impl/test/stb/CMakeLists.txt (100%) rename source/dnode/{mgmt => mnode}/impl/test/stb/stb.cpp (100%) diff --git a/source/dnode/mgmt/impl/test/CMakeLists.txt b/source/dnode/mgmt/impl/test/CMakeLists.txt index 8f721dae9a..ce93a14d3f 100644 --- a/source/dnode/mgmt/impl/test/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/CMakeLists.txt @@ -4,8 +4,5 @@ add_subdirectory(qnode) add_subdirectory(bnode) add_subdirectory(snode) add_subdirectory(mnode) -add_subdirectory(db) -add_subdirectory(stb) -add_subdirectory(vgroup) - +add_subdirectory(vnode) add_subdirectory(sut) diff --git a/source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt b/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt similarity index 100% rename from source/dnode/mgmt/impl/test/vgroup/CMakeLists.txt rename to source/dnode/mgmt/impl/test/vnode/CMakeLists.txt diff --git a/source/dnode/mgmt/impl/test/vgroup/vgroup.cpp b/source/dnode/mgmt/impl/test/vnode/vnode.cpp similarity index 100% rename from source/dnode/mgmt/impl/test/vgroup/vgroup.cpp rename to source/dnode/mgmt/impl/test/vnode/vnode.cpp diff --git a/source/dnode/mnode/impl/test/CMakeLists.txt b/source/dnode/mnode/impl/test/CMakeLists.txt index 4d37bb368d..3ca35d58a7 100644 --- a/source/dnode/mnode/impl/test/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/CMakeLists.txt @@ -10,3 +10,5 @@ add_subdirectory(show) add_subdirectory(profile) add_subdirectory(dnode) add_subdirectory(mnode) +add_subdirectory(db) +add_subdirectory(stb) diff --git a/source/dnode/mgmt/impl/test/db/CMakeLists.txt b/source/dnode/mnode/impl/test/db/CMakeLists.txt similarity index 100% rename from source/dnode/mgmt/impl/test/db/CMakeLists.txt rename to source/dnode/mnode/impl/test/db/CMakeLists.txt diff --git a/source/dnode/mgmt/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp similarity index 100% rename from source/dnode/mgmt/impl/test/db/db.cpp rename to source/dnode/mnode/impl/test/db/db.cpp diff --git a/source/dnode/mgmt/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt similarity index 100% rename from source/dnode/mgmt/impl/test/stb/CMakeLists.txt rename to source/dnode/mnode/impl/test/stb/CMakeLists.txt diff --git a/source/dnode/mgmt/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp similarity index 100% rename from source/dnode/mgmt/impl/test/stb/stb.cpp rename to source/dnode/mnode/impl/test/stb/stb.cpp From a973ad3ff4d91ed5a72e29fe7691a4b43a648757 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 7 Jan 2022 20:57:46 -0800 Subject: [PATCH 2/7] minor changes --- .../dnode/mgmt/impl/test/vnode/CMakeLists.txt | 10 ++++---- source/dnode/mgmt/impl/test/vnode/vnode.cpp | 8 +++---- .../dnode/mnode/impl/test/db/CMakeLists.txt | 8 +++---- source/dnode/mnode/impl/test/db/db.cpp | 24 +++++++++---------- source/dnode/mnode/impl/test/stb/stb.cpp | 16 ++++++------- 5 files changed, 33 insertions(+), 33 deletions(-) diff --git a/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt b/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt index b864b0593c..6fb8bb4ba4 100644 --- a/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt +++ b/source/dnode/mgmt/impl/test/vnode/CMakeLists.txt @@ -1,11 +1,11 @@ -aux_source_directory(. VGROUP_SRC) -add_executable(dnode_test_vgroup ${VGROUP_SRC}) +aux_source_directory(. VNODE_SRC) +add_executable(dnode_test_vnode ${VNODE_SRC}) target_link_libraries( - dnode_test_vgroup + dnode_test_vnode PUBLIC sut ) add_test( - NAME dnode_test_vgroup - COMMAND dnode_test_vgroup + NAME dnode_test_vnode + COMMAND dnode_test_vnode ) diff --git a/source/dnode/mgmt/impl/test/vnode/vnode.cpp b/source/dnode/mgmt/impl/test/vnode/vnode.cpp index 7fa3b4ab61..29b6ed2459 100644 --- a/source/dnode/mgmt/impl/test/vnode/vnode.cpp +++ b/source/dnode/mgmt/impl/test/vnode/vnode.cpp @@ -11,9 +11,9 @@ #include "sut.h" -class DndTestVgroup : public ::testing::Test { +class DndTestVnode : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vgroup", 9150); } + static void SetUpTestSuite() { test.Init("/tmp/dnode_test_vnode", 9150); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -23,9 +23,9 @@ class DndTestVgroup : public ::testing::Test { void TearDown() override {} }; -Testbase DndTestVgroup::test; +Testbase DndTestVnode::test; -TEST_F(DndTestVgroup, 01_Create_Restart_Drop_Vnode) { +TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) { { for (int i = 0; i < 3; ++i) { int32_t contLen = sizeof(SCreateVnodeMsg); diff --git a/source/dnode/mnode/impl/test/db/CMakeLists.txt b/source/dnode/mnode/impl/test/db/CMakeLists.txt index cb9f1600fc..f0abdf152c 100644 --- a/source/dnode/mnode/impl/test/db/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/db/CMakeLists.txt @@ -1,11 +1,11 @@ aux_source_directory(. DB_SRC) -add_executable(dnode_test_db ${DB_SRC}) +add_executable(mnode_test_db ${DB_SRC}) target_link_libraries( - dnode_test_db + mnode_test_db PUBLIC sut ) add_test( - NAME dnode_test_db - COMMAND dnode_test_db + NAME mnode_test_db + COMMAND mnode_test_db ) diff --git a/source/dnode/mnode/impl/test/db/db.cpp b/source/dnode/mnode/impl/test/db/db.cpp index 3a69ae2305..42cf753c7c 100644 --- a/source/dnode/mnode/impl/test/db/db.cpp +++ b/source/dnode/mnode/impl/test/db/db.cpp @@ -1,19 +1,19 @@ /** * @file db.cpp * @author slguan (slguan@taosdata.com) - * @brief DNODE module db-msg tests - * @version 0.1 - * @date 2021-12-15 + * @brief MNODE module db tests + * @version 1.0 + * @date 2022-01-11 * - * @copyright Copyright (c) 2021 + * @copyright Copyright (c) 2022 * */ #include "sut.h" -class DndTestDb : public ::testing::Test { +class MndTestDb : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_db", 9040); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_db", 9030); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -23,9 +23,9 @@ class DndTestDb : public ::testing::Test { void TearDown() override {} }; -Testbase DndTestDb::test; +Testbase MndTestDb::test; -TEST_F(DndTestDb, 01_ShowDb) { +TEST_F(MndTestDb, 01_ShowDb) { test.SendShowMetaReq(TSDB_MGMT_TABLE_DB, ""); CHECK_META("show databases", 18); CHECK_SCHEMA(0, TSDB_DATA_TYPE_BINARY, TSDB_DB_NAME_LEN - 1 + VARSTR_HEADER_SIZE, "name"); @@ -51,7 +51,7 @@ TEST_F(DndTestDb, 01_ShowDb) { EXPECT_EQ(test.GetShowRows(), 0); } -TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { +TEST_F(MndTestDb, 02_Create_Alter_Drop_Db) { { int32_t contLen = sizeof(SCreateDbMsg); @@ -211,7 +211,7 @@ TEST_F(DndTestDb, 02_Create_Alter_Drop_Db) { EXPECT_EQ(test.GetShowRows(), 0); } -TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { +TEST_F(MndTestDb, 03_Create_Use_Restart_Use_Db) { { int32_t contLen = sizeof(SCreateDbMsg); @@ -281,7 +281,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(pInfo->numOfEps, 1); SEpAddrMsg* pAddr = &pInfo->epAddr[0]; pAddr->port = htons(pAddr->port); - EXPECT_EQ(pAddr->port, 9040); + EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } @@ -297,7 +297,7 @@ TEST_F(DndTestDb, 03_Create_Use_Restart_Use_Db) { EXPECT_EQ(pInfo->numOfEps, 1); SEpAddrMsg* pAddr = &pInfo->epAddr[0]; pAddr->port = htons(pAddr->port); - EXPECT_EQ(pAddr->port, 9040); + EXPECT_EQ(pAddr->port, 9030); EXPECT_STREQ(pAddr->fqdn, "localhost"); } } diff --git a/source/dnode/mnode/impl/test/stb/stb.cpp b/source/dnode/mnode/impl/test/stb/stb.cpp index d3362c7a9b..55cc030122 100644 --- a/source/dnode/mnode/impl/test/stb/stb.cpp +++ b/source/dnode/mnode/impl/test/stb/stb.cpp @@ -1,19 +1,19 @@ /** * @file stb.cpp * @author slguan (slguan@taosdata.com) - * @brief DNODE module db-msg tests - * @version 0.1 - * @date 2021-12-17 + * @brief MNODE module stb tests + * @version 1.0 + * @date 2022-01-12 * - * @copyright Copyright (c) 2021 + * @copyright Copyright (c) 2022 * */ #include "sut.h" -class DndTestStb : public ::testing::Test { +class MndTestStb : public ::testing::Test { protected: - static void SetUpTestSuite() { test.Init("/tmp/dnode_test_stb", 9101); } + static void SetUpTestSuite() { test.Init("/tmp/mnode_test_stb", 9034); } static void TearDownTestSuite() { test.Cleanup(); } static Testbase test; @@ -23,9 +23,9 @@ class DndTestStb : public ::testing::Test { void TearDown() override {} }; -Testbase DndTestStb::test; +Testbase MndTestStb::test; -TEST_F(DndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { +TEST_F(MndTestStb, 01_Create_Show_Meta_Drop_Restart_Stb) { { int32_t contLen = sizeof(SCreateDbMsg); From c2eadc27060ce70dcd402053f6a96e700d7061fd Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 7 Jan 2022 23:03:06 -0800 Subject: [PATCH 3/7] minor changes --- include/dnode/vnode/vnode.h | 2 ++ source/dnode/mgmt/impl/src/dndVnodes.c | 8 ++++++-- source/dnode/vnode/impl/src/vnodeMain.c | 7 +++---- 3 files changed, 11 insertions(+), 6 deletions(-) diff --git a/include/dnode/vnode/vnode.h b/include/dnode/vnode/vnode.h index af56d69b11..f3ad0b0176 100644 --- a/include/dnode/vnode/vnode.h +++ b/include/dnode/vnode/vnode.h @@ -32,6 +32,8 @@ extern "C" { /* ------------------------ TYPES EXPOSED ------------------------ */ typedef struct SVnode SVnode; typedef struct SVnodeCfg { + int32_t vgId; + /** vnode buffer pool options */ struct { /** write buffer size */ diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 6427ab080a..baa15f9d33 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -412,7 +412,10 @@ static void *dnodeOpenVnodeFunc(void *param) { pMgmt->openVnodes, pMgmt->totalVnodes); dndReportStartup(pDnode, "open-vnodes", stepDesc); - SVnode *pImpl = vnodeOpen(pCfg->path, NULL); + SVnodeCfg vnodeCfg = {0}; + vnodeCfg.vgId = pCfg->vgId; + + SVnode *pImpl = vnodeOpen(pCfg->path, &vnodeCfg); if (pImpl == NULL) { dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); pThread->failed++; @@ -550,6 +553,7 @@ static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { } static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { + pCfg->vgId = pCreate->vgId; pCfg->wsize = pCreate->cacheBlockSize; pCfg->ssize = pCreate->cacheBlockSize; pCfg->wsize = pCreate->cacheBlockSize; @@ -610,7 +614,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } - SVnode *pImpl = vnodeOpen(wrapperCfg.path, NULL /*pCfg*/); + SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { return -1; } diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index 88175646ae..c87749203a 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -20,13 +20,12 @@ static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfgInput) { SVnode *pVnode = NULL; // Set default options - if (pVnodeCfg == NULL) { - pVnodeCfg = &defaultVnodeOptions; - } + SVnodeCfg *pVnodeCfg = &defaultVnodeOptions; + pVnodeCfg->vgId = pVnodeCfg->vgId; // Validate options if (vnodeValidateOptions(pVnodeCfg) < 0) { From 33a60b60445d595f867b1d45a118d5e8fb9468dc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Fri, 7 Jan 2022 23:20:18 -0800 Subject: [PATCH 4/7] minor changes --- include/common/tmsg.h | 8 +-- include/util/taoserror.h | 9 ++- source/dnode/mgmt/impl/inc/dndVnodes.h | 12 ++-- source/dnode/mgmt/impl/src/dndVnodes.c | 71 +++++++++++---------- source/dnode/mgmt/impl/test/vnode/vnode.cpp | 14 ++-- source/dnode/mnode/impl/inc/mndVgroup.h | 4 +- source/dnode/mnode/impl/src/mndDb.c | 16 ++--- source/dnode/mnode/impl/src/mndVgroup.c | 8 +-- source/util/src/terror.c | 5 +- 9 files changed, 78 insertions(+), 69 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 2519f4247d..7ff52b83d5 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -805,19 +805,19 @@ typedef struct { int8_t replica; int8_t selfIndex; SReplica replicas[TSDB_MAX_REPLICA]; -} SCreateVnodeMsg, SAlterVnodeMsg; +} SCreateVnodeReq, SAlterVnodeReq; typedef struct { int32_t vgId; int32_t dnodeId; - char db[TSDB_DB_FNAME_LEN]; uint64_t dbUid; -} SDropVnodeMsg, SSyncVnodeMsg, SCompactVnodeMsg; + char db[TSDB_DB_FNAME_LEN]; +} SDropVnodeReq, SSyncVnodeReq, SCompactVnodeReq; typedef struct { int32_t vgId; int8_t accessState; -} SAuthVnodeMsg; +} SAuthVnodeReq; typedef struct { SMsgHead header; diff --git a/include/util/taoserror.h b/include/util/taoserror.h index 56f05ec0bf..80241405a6 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -277,9 +277,12 @@ int32_t* taosGetErrno(); #define TSDB_CODE_DND_BNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0452) #define TSDB_CODE_DND_BNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0453) #define TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0454) -#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0460) -#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0461) -#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0462) +#define TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0460) +#define TSDB_CODE_DND_VNODE_NOT_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0461) +#define TSDB_CODE_DND_VNODE_INVALID_OPTION TAOS_DEF_ERROR_CODE(0, 0x0462) +#define TSDB_CODE_DND_VNODE_READ_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0463) +#define TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0464) +#define TSDB_CODE_DND_VNODE_TOO_MANY_VNODES TAOS_DEF_ERROR_CODE(0, 0x0465) // vnode #define TSDB_CODE_VND_ACTION_IN_PROGRESS TAOS_DEF_ERROR_CODE(0, 0x0500) //"Action in progress") diff --git a/source/dnode/mgmt/impl/inc/dndVnodes.h b/source/dnode/mgmt/impl/inc/dndVnodes.h index bf5f0122c1..b5fae62959 100644 --- a/source/dnode/mgmt/impl/inc/dndVnodes.h +++ b/source/dnode/mgmt/impl/inc/dndVnodes.h @@ -29,12 +29,12 @@ void dndProcessVnodeSyncMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeQueryMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); -int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg); +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq); +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq); #ifdef __cplusplus } diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index baa15f9d33..1269562968 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -527,8 +527,8 @@ static void dndCloseVnodes(SDnode *pDnode) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { - SCreateVnodeMsg *pCreate = rpcMsg->pCont; +static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { + SCreateVnodeReq *pCreate = rpcMsg->pCont; pCreate->vgId = htonl(pCreate->vgId); pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dbUid = htobe64(pCreate->dbUid); @@ -552,7 +552,7 @@ static SCreateVnodeMsg *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { return pCreate; } -static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { +static void dndGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) { pCfg->vgId = pCreate->vgId; pCfg->wsize = pCreate->cacheBlockSize; pCfg->ssize = pCreate->cacheBlockSize; @@ -576,7 +576,7 @@ static void dndGenerateVnodeCfg(SCreateVnodeMsg *pCreate, SVnodeCfg *pCfg) { pCfg->walCfg.vgId = pCreate->vgId; } -static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWrapperCfg *pCfg) { +static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWrapperCfg *pCfg) { memcpy(pCfg->db, pCreate->db, TSDB_DB_FNAME_LEN); pCfg->dbUid = pCreate->dbUid; pCfg->dropped = 0; @@ -585,20 +585,20 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeMsg *pCreate, SWra pCfg->vgVersion = pCreate->vgVersion; } -static SDropVnodeMsg *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { - SDropVnodeMsg *pDrop = rpcMsg->pCont; +static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { + SDropVnodeReq *pDrop = rpcMsg->pCont; pDrop->vgId = htonl(pDrop->vgId); return pDrop; } -static SAuthVnodeMsg *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { - SAuthVnodeMsg *pAuth = rpcMsg->pCont; +static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { + SAuthVnodeReq *pAuth = rpcMsg->pCont; pAuth->vgId = htonl(pAuth->vgId); return pAuth; } -int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SCreateVnodeMsg *pCreate = dndParseCreateVnodeReq(rpcMsg); +int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SCreateVnodeReq *pCreate = dndParseCreateVnodeReq(pReq); dDebug("vgId:%d, create vnode req is received", pCreate->vgId); SVnodeCfg vnodeCfg = {0}; @@ -611,16 +611,19 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { if (pVnode != NULL) { dDebug("vgId:%d, already exist, return success", pCreate->vgId); dndReleaseVnode(pDnode, pVnode); - return 0; + terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; + return -1; } SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); if (pImpl == NULL) { + dError("vgId:%d, failed to create vnode since %s", pCreate->vgId, terrstr()); return -1; } int32_t code = dndOpenVnode(pDnode, &wrapperCfg, pImpl); if (code != 0) { + dError("vgId:%d, failed to open vnode since %s", pCreate->vgId, terrstr()); vnodeClose(pImpl); vnodeDestroy(wrapperCfg.path); terrno = code; @@ -638,8 +641,8 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SAlterVnodeMsg *pAlter = (SAlterVnodeMsg *)dndParseCreateVnodeReq(rpcMsg); +int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SAlterVnodeReq *pAlter = (SAlterVnodeReq *)dndParseCreateVnodeReq(pReq); dDebug("vgId:%d, alter vnode req is received", pAlter->vgId); SVnodeCfg vnodeCfg = {0}; @@ -651,7 +654,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); - return terrno; + return -1; } if (wrapperCfg.vgVersion == pVnode->vgVersion) { @@ -663,7 +666,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { if (vnodeAlter(pVnode->pImpl, &vnodeCfg) != 0) { dError("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); - return terrno; + return -1; } int32_t oldVersion = pVnode->vgVersion; @@ -677,8 +680,8 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return code; } -int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SDropVnodeMsg *pDrop = vnodeParseDropVnodeReq(rpcMsg); +int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SDropVnodeReq *pDrop = vnodeParseDropVnodeReq(pReq); int32_t vgId = pDrop->vgId; dDebug("vgId:%d, drop vnode req is received", vgId); @@ -686,13 +689,14 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to drop since %s", vgId, terrstr()); - return 0; + return -1; } pVnode->dropped = 1; if (dndWriteVnodesToFile(pDnode) != 0) { pVnode->dropped = 0; - return terrno; + dndReleaseVnode(pDnode, pVnode); + return -1; } dndReleaseVnode(pDnode, pVnode); @@ -704,10 +708,9 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); +int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SAuthVnodeReq *pAuth = (SAuthVnodeReq *)vnodeParseAuthVnodeReq(pReq); - int32_t code = 0; int32_t vgId = pAuth->vgId; dDebug("vgId:%d, auth vnode req is received", vgId); @@ -722,30 +725,30 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { return 0; } -int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SAuthVnodeMsg *pAuth = (SAuthVnodeMsg *)vnodeParseAuthVnodeReq(rpcMsg); +int32_t dndProcessSyncVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SSyncVnodeReq *pSync = (SSyncVnodeReq *)vnodeParseDropVnodeReq(pReq); - int32_t vgId = pAuth->vgId; - dDebug("vgId:%d, auth vnode req is received", vgId); + int32_t vgId = pSync->vgId; + dDebug("vgId:%d, sync vnode req is received", vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { - dDebug("vgId:%d, failed to auth since %s", vgId, terrstr()); - return terrno; + dDebug("vgId:%d, failed to sync since %s", vgId, terrstr()); + return -1; } if (vnodeSync(pVnode->pImpl) != 0) { - dError("vgId:%d, failed to auth vnode since %s", vgId, terrstr()); + dError("vgId:%d, failed to sync vnode since %s", vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); - return terrno; + return -1; } dndReleaseVnode(pDnode, pVnode); return 0; } -int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { - SCompactVnodeMsg *pCompact = (SCompactVnodeMsg *)vnodeParseDropVnodeReq(rpcMsg); +int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { + SCompactVnodeReq *pCompact = (SCompactVnodeReq *)vnodeParseDropVnodeReq(pReq); int32_t vgId = pCompact->vgId; dDebug("vgId:%d, compact vnode req is received", vgId); @@ -753,13 +756,13 @@ int32_t dndProcessCompactVnodeReq(SDnode *pDnode, SRpcMsg *rpcMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to compact since %s", vgId, terrstr()); - return terrno; + return -1; } if (vnodeCompact(pVnode->pImpl) != 0) { dError("vgId:%d, failed to compact vnode since %s", vgId, terrstr()); dndReleaseVnode(pDnode, pVnode); - return terrno; + return -1; } dndReleaseVnode(pDnode, pVnode); diff --git a/source/dnode/mgmt/impl/test/vnode/vnode.cpp b/source/dnode/mgmt/impl/test/vnode/vnode.cpp index 29b6ed2459..7233137140 100644 --- a/source/dnode/mgmt/impl/test/vnode/vnode.cpp +++ b/source/dnode/mgmt/impl/test/vnode/vnode.cpp @@ -28,9 +28,9 @@ Testbase DndTestVnode::test; TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) { { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SCreateVnodeMsg); + int32_t contLen = sizeof(SCreateVnodeReq); - SCreateVnodeMsg* pReq = (SCreateVnodeMsg*)rpcMallocCont(contLen); + SCreateVnodeReq* pReq = (SCreateVnodeReq*)rpcMallocCont(contLen); pReq->vgId = htonl(2); pReq->dnodeId = htonl(1); strcpy(pReq->db, "1.d1"); @@ -68,9 +68,9 @@ TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) { { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SAlterVnodeMsg); + int32_t contLen = sizeof(SAlterVnodeReq); - SAlterVnodeMsg* pReq = (SAlterVnodeMsg*)rpcMallocCont(contLen); + SAlterVnodeReq* pReq = (SAlterVnodeReq*)rpcMallocCont(contLen); pReq->vgId = htonl(2); pReq->dnodeId = htonl(1); strcpy(pReq->db, "1.d1"); @@ -108,9 +108,9 @@ TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) { { for (int i = 0; i < 3; ++i) { - int32_t contLen = sizeof(SDropVnodeMsg); + int32_t contLen = sizeof(SDropVnodeReq); - SDropVnodeMsg* pReq = (SDropVnodeMsg*)rpcMallocCont(contLen); + SDropVnodeReq* pReq = (SDropVnodeReq*)rpcMallocCont(contLen); pReq->vgId = htonl(2); pReq->dnodeId = htonl(1); strcpy(pReq->db, "1.d1"); @@ -118,7 +118,7 @@ TEST_F(DndTestVnode, 01_Create_Restart_Drop_Vnode) { SRpcMsg rpcMsg = {0}; rpcMsg.pCont = pReq; - rpcMsg.contLen = sizeof(SDropVnodeMsg); + rpcMsg.contLen = sizeof(SDropVnodeReq); rpcMsg.msgType = TDMT_DND_DROP_VNODE; SRpcMsg* pRsp = test.SendReq(TDMT_DND_DROP_VNODE, pReq, contLen); diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 6d391450b7..9e4656fec8 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -31,8 +31,8 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups); SEpSet mndGetVgroupEpset(SMnode *pMnode, SVgObj *pVgroup); int32_t mndGetVnodesNum(SMnode *pMnode, int32_t dnodeId); -SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); -SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); +SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 251cbb9217..bf5d01d0a2 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -331,11 +331,11 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SCreateVnodeMsg *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SCreateVnodeReq *pMsg = mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SCreateVnodeMsg); + action.contLen = sizeof(SCreateVnodeReq); action.msgType = TDMT_DND_CREATE_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -360,11 +360,11 @@ static int32_t mndSetCreateDbUndoActions(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SDropVnodeMsg); + action.contLen = sizeof(SDropVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; if (mndTransAppendUndoAction(pTrans, &action) != 0) { free(pMsg); @@ -593,11 +593,11 @@ static int32_t mndBuildUpdateVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SAlterVnodeMsg *pMsg = (SAlterVnodeMsg *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SAlterVnodeReq *pMsg = (SAlterVnodeReq *)mndBuildCreateVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SAlterVnodeMsg); + action.contLen = sizeof(SAlterVnodeReq); action.msgType = TDMT_DND_ALTER_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); @@ -757,11 +757,11 @@ static int32_t mndBuildDropVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj * action.epSet = mndGetDnodeEpset(pDnode); mndReleaseDnode(pMnode, pDnode); - SDropVnodeMsg *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); + SDropVnodeReq *pMsg = mndBuildDropVnodeMsg(pMnode, pDnode, pDb, pVgroup); if (pMsg == NULL) return -1; action.pCont = pMsg; - action.contLen = sizeof(SCreateVnodeMsg); + action.contLen = sizeof(SCreateVnodeReq); action.msgType = TDMT_DND_DROP_VNODE; if (mndTransAppendRedoAction(pTrans, &action) != 0) { free(pMsg); diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index bd17c6d150..e9d35a6e4c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -189,8 +189,8 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) { sdbRelease(pSdb, pVgroup); } -SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SCreateVnodeMsg *pCreate = calloc(1, sizeof(SCreateVnodeMsg)); +SCreateVnodeReq *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SCreateVnodeReq *pCreate = calloc(1, sizeof(SCreateVnodeReq)); if (pCreate == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; @@ -248,8 +248,8 @@ SCreateVnodeMsg *mndBuildCreateVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbOb return pCreate; } -SDropVnodeMsg *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { - SDropVnodeMsg *pDrop = calloc(1, sizeof(SDropVnodeMsg)); +SDropVnodeReq *mndBuildDropVnodeMsg(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup) { + SDropVnodeReq *pDrop = calloc(1, sizeof(SDropVnodeReq)); if (pDrop == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index eaaaf28297..3ea564722b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -277,9 +277,12 @@ TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_NOT_DEPLOYED, "Bnode not deployed") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_INVALID_OPTION, "Bnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_READ_FILE_ERROR, "Read bnode.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_BNODE_WRITE_FILE_ERROR, "Write bnode.json error") -TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnode directories") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED, "Vnode already deployed") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_NOT_DEPLOYED, "Vnode not deployed") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_INVALID_OPTION, "Vnode option invalid") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_READ_FILE_ERROR, "Read vnodes.json error") TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_WRITE_FILE_ERROR, "Write vnodes.json error") +TAOS_DEFINE_ERROR(TSDB_CODE_DND_VNODE_TOO_MANY_VNODES, "Too many vnodes") // vnode TAOS_DEFINE_ERROR(TSDB_CODE_VND_ACTION_IN_PROGRESS, "Action in progress") From 1373e3cfe18354b56d709fadcada043f27dc7fac Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 8 Jan 2022 00:48:09 -0800 Subject: [PATCH 5/7] refact vnode queue --- source/dnode/mgmt/impl/src/dndBnode.c | 13 +- source/dnode/mgmt/impl/src/dndMnode.c | 13 +- source/dnode/mgmt/impl/src/dndQnode.c | 13 +- source/dnode/mgmt/impl/src/dndSnode.c | 13 +- source/dnode/mgmt/impl/src/dndVnodes.c | 314 +++++++----------------- source/dnode/vnode/impl/src/vnodeMain.c | 7 +- 6 files changed, 111 insertions(+), 262 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndBnode.c b/source/dnode/mgmt/impl/src/dndBnode.c index 61b220158f..15be59a419 100644 --- a/source/dnode/mgmt/impl/src/dndBnode.c +++ b/source/dnode/mgmt/impl/src/dndBnode.c @@ -42,18 +42,13 @@ static SBnode *dndAcquireBnode(SDnode *pDnode) { } static void dndReleaseBnode(SDnode *pDnode, SBnode *pBnode) { + if (pBnode == NULL) return; + SBnodeMgmt *pMgmt = &pDnode->bmgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pBnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pBnode != NULL) { - dTrace("release bnode, refCount:%d", refCount); - } + dTrace("release bnode, refCount:%d", refCount); } static int32_t dndReadBnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index ae37967b3d..6c23af7f00 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -43,18 +43,13 @@ static SMnode *dndAcquireMnode(SDnode *pDnode) { } static void dndReleaseMnode(SDnode *pDnode, SMnode *pMnode) { + if (pMnode == NULL) return; + SMnodeMgmt *pMgmt = &pDnode->mmgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pMnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pMnode != NULL) { - dTrace("release mnode, refCount:%d", refCount); - } + dTrace("release mnode, refCount:%d", refCount); } static int32_t dndReadMnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndQnode.c b/source/dnode/mgmt/impl/src/dndQnode.c index 3deee93e29..9d2f623c45 100644 --- a/source/dnode/mgmt/impl/src/dndQnode.c +++ b/source/dnode/mgmt/impl/src/dndQnode.c @@ -42,18 +42,13 @@ static SQnode *dndAcquireQnode(SDnode *pDnode) { } static void dndReleaseQnode(SDnode *pDnode, SQnode *pQnode) { + if (pQnode == NULL) return; + SQnodeMgmt *pMgmt = &pDnode->qmgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pQnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pQnode != NULL) { - dTrace("release qnode, refCount:%d", refCount); - } + dTrace("release qnode, refCount:%d", refCount); } static int32_t dndReadQnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndSnode.c b/source/dnode/mgmt/impl/src/dndSnode.c index ab4e38bfb2..00435d4c3e 100644 --- a/source/dnode/mgmt/impl/src/dndSnode.c +++ b/source/dnode/mgmt/impl/src/dndSnode.c @@ -42,18 +42,13 @@ static SSnode *dndAcquireSnode(SDnode *pDnode) { } static void dndReleaseSnode(SDnode *pDnode, SSnode *pSnode) { + if (pSnode == NULL) return; + SSnodeMgmt *pMgmt = &pDnode->smgmt; - int32_t refCount = 0; - taosRLockLatch(&pMgmt->latch); - if (pSnode != NULL) { - refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); - } + int32_t refCount = atomic_sub_fetch_32(&pMgmt->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - - if (pSnode != NULL) { - dTrace("release snode, refCount:%d", refCount); - } + dTrace("release snode, refCount:%d", refCount); } static int32_t dndReadSnodeFile(SDnode *pDnode) { diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 1269562968..6d90da8b31 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -40,7 +40,7 @@ typedef struct { STaosQueue *pSyncQ; STaosQueue *pApplyQ; STaosQueue *pQueryQ; - STaosQueue* pFetchQ; + STaosQueue *pFetchQ; } SVnodeObj; typedef struct { @@ -53,22 +53,8 @@ typedef struct { SWrapperCfg *pCfgs; } SVnodeThread; -static int32_t dndInitVnodeReadWorker(SDnode *pDnode); -static int32_t dndInitVnodeWriteWorker(SDnode *pDnode); -static int32_t dndInitVnodeSyncWorker(SDnode *pDnode); -static void dndCleanupVnodeReadWorker(SDnode *pDnode); -static void dndCleanupVnodeWriteWorker(SDnode *pDnode); -static void dndCleanupVnodeSyncWorker(SDnode *pDnode); -static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); -static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode); -static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode); +static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode); +static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode); static void dndProcessVnodeQueryQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); static void dndProcessVnodeFetchQueue(SVnodeObj *pVnode, SRpcMsg *pMsg); @@ -117,11 +103,9 @@ static void dndReleaseVnode(SDnode *pDnode, SVnodeObj *pVnode) { if (pVnode == NULL) return; SVnodesMgmt *pMgmt = &pDnode->vmgmt; - taosRLockLatch(&pMgmt->latch); int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1); taosRUnLockLatch(&pMgmt->latch); - dTrace("vgId:%d, release vnode, refCount:%d", pVnode->vgId, refCount); } @@ -134,7 +118,7 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { } pVnode->vgId = pCfg->vgId; - pVnode->refCount = 1; + pVnode->refCount = 0; pVnode->dropped = 0; pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->pImpl = pImpl; @@ -148,23 +132,8 @@ static int32_t dndOpenVnode(SDnode *pDnode, SWrapperCfg *pCfg, SVnode *pImpl) { return -1; } - if (dndAllocVnodeQueryQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeFetchQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeWriteQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeApplyQueue(pDnode, pVnode) != 0) { - return -1; - } - - if (dndAllocVnodeSyncQueue(pDnode, pVnode) != 0) { + if (dndAllocVnodeQueue(pDnode, pVnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } @@ -192,12 +161,7 @@ static void dndCloseVnode(SDnode *pDnode, SVnodeObj *pVnode) { while (!taosQueueEmpty(pVnode->pQueryQ)) taosMsleep(10); while (!taosQueueEmpty(pVnode->pFetchQ)) taosMsleep(10); - dndFreeVnodeQueryQueue(pDnode, pVnode); - dndFreeVnodeFetchQueue(pDnode, pVnode); - dndFreeVnodeWriteQueue(pDnode, pVnode); - dndFreeVnodeApplyQueue(pDnode, pVnode); - dndFreeVnodeSyncQueue(pDnode, pVnode); - + dndFreeVnodeQueue(pDnode, pVnode); vnodeClose(pVnode->pImpl); pVnode->pImpl = NULL; @@ -527,8 +491,8 @@ static void dndCloseVnodes(SDnode *pDnode) { dInfo("total vnodes:%d are all closed", numOfVnodes); } -static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *rpcMsg) { - SCreateVnodeReq *pCreate = rpcMsg->pCont; +static SCreateVnodeReq *dndParseCreateVnodeReq(SRpcMsg *pReq) { + SCreateVnodeReq *pCreate = pReq->pCont; pCreate->vgId = htonl(pCreate->vgId); pCreate->dnodeId = htonl(pCreate->dnodeId); pCreate->dbUid = htobe64(pCreate->dbUid); @@ -585,14 +549,14 @@ static void dndGenerateWrapperCfg(SDnode *pDnode, SCreateVnodeReq *pCreate, SWra pCfg->vgVersion = pCreate->vgVersion; } -static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *rpcMsg) { - SDropVnodeReq *pDrop = rpcMsg->pCont; +static SDropVnodeReq *vnodeParseDropVnodeReq(SRpcMsg *pReq) { + SDropVnodeReq *pDrop = pReq->pCont; pDrop->vgId = htonl(pDrop->vgId); return pDrop; } -static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *rpcMsg) { - SAuthVnodeReq *pAuth = rpcMsg->pCont; +static SAuthVnodeReq *vnodeParseAuthVnodeReq(SRpcMsg *pReq) { + SAuthVnodeReq *pAuth = pReq->pCont; pAuth->vgId = htonl(pAuth->vgId); return pAuth; } @@ -612,7 +576,7 @@ int32_t dndProcessCreateVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { dDebug("vgId:%d, already exist, return success", pCreate->vgId); dndReleaseVnode(pDnode, pVnode); terrno = TSDB_CODE_DND_VNODE_ALREADY_DEPLOYED; - return -1; + return 0; } SVnode *pImpl = vnodeOpen(wrapperCfg.path, &vnodeCfg); @@ -648,16 +612,13 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SVnodeCfg vnodeCfg = {0}; dndGenerateVnodeCfg(pAlter, &vnodeCfg); - SWrapperCfg wrapperCfg = {0}; - dndGenerateWrapperCfg(pDnode, pAlter, &wrapperCfg); - SVnodeObj *pVnode = dndAcquireVnode(pDnode, pAlter->vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to alter vnode since %s", pAlter->vgId, terrstr()); return -1; } - if (wrapperCfg.vgVersion == pVnode->vgVersion) { + if (pAlter->vgVersion == pVnode->vgVersion) { dndReleaseVnode(pDnode, pVnode); dDebug("vgId:%d, no need to alter vnode cfg for version unchanged ", pAlter->vgId); return 0; @@ -670,7 +631,7 @@ int32_t dndProcessAlterVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { } int32_t oldVersion = pVnode->vgVersion; - pVnode->vgVersion = wrapperCfg.vgVersion; + pVnode->vgVersion = pAlter->vgVersion; int32_t code = dndWriteVnodesToFile(pDnode); if (code != 0) { pVnode->vgVersion = oldVersion; @@ -689,7 +650,7 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to drop since %s", vgId, terrstr()); - return -1; + return 0; } pVnode->dropped = 1; @@ -717,7 +678,7 @@ int32_t dndProcessAuthVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); if (pVnode == NULL) { dDebug("vgId:%d, failed to auth since %s", vgId, terrstr()); - return terrno; + return -1; } pVnode->accessState = pAuth->accessState; @@ -821,6 +782,7 @@ static void dndProcessVnodeApplyQueue(SVnodeObj *pVnode, STaosQall *qall, int32_ for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); + // todo SRpcMsg *pRsp = NULL; (void)vnodeApplyWMsg(pVnode->pImpl, pMsg, &pRsp); } @@ -832,6 +794,7 @@ static void dndProcessVnodeSyncQueue(SVnodeObj *pVnode, STaosQall *qall, int32_t for (int32_t i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pMsg); + // todo SRpcMsg *pRsp = NULL; (void)vnodeProcessSyncReq(pVnode->pImpl, pMsg, &pRsp); } @@ -855,21 +818,25 @@ static int32_t dndWriteRpcMsgToVnodeQueue(STaosQueue *pQueue, SRpcMsg *pRpcMsg) } if (code != TSDB_CODE_SUCCESS) { - SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; - rpcSendResponse(&rsp); + if (pRpcMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = code}; + rpcSendResponse(&rsp); + } rpcFreeCont(pRpcMsg->pCont); } } static SVnodeObj *dndAcquireVnodeFromMsg(SDnode *pDnode, SRpcMsg *pMsg) { - SMsgHead *pHead = (SMsgHead *)pMsg->pCont; + SMsgHead *pHead = pMsg->pCont; pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); SVnodeObj *pVnode = dndAcquireVnode(pDnode, pHead->vgId); if (pVnode == NULL) { - SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; - rpcSendResponse(&rsp); + if (pMsg->msgType & 1u) { + SRpcMsg rsp = {.handle = pMsg->handle, .code = TSDB_CODE_VND_INVALID_VGROUP_ID}; + rpcSendResponse(&rsp); + } rpcFreeCont(pMsg->pCont); } @@ -910,193 +877,96 @@ void dndProcessVnodeFetchMsg(SDnode *pDnode, SRpcMsg *pMsg, SEpSet *pEpSet) { static int32_t dndPutMsgIntoVnodeApplyQueue(SDnode *pDnode, int32_t vgId, SRpcMsg *pMsg) { SVnodeObj *pVnode = dndAcquireVnode(pDnode, vgId); - if (pVnode == NULL) { - return -1; - } + if (pVnode == NULL) return -1; int32_t code = taosWriteQitem(pVnode->pApplyQ, pMsg); dndReleaseVnode(pDnode, pVnode); return code; } -static int32_t dndAllocVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); - if (pVnode->pQueryQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeQueryQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); - pVnode->pQueryQ = NULL; -} - -static int32_t dndAllocVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); - if (pVnode->pFetchQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeFetchQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); - pVnode->pFetchQ = NULL; -} - -static int32_t dndInitVnodeReadWorker(SDnode *pDnode) { +static int32_t dndInitVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; int32_t maxFetchThreads = 4; - float threadsForQuery = MAX(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores, 1); + int32_t minFetchThreads = MIN(maxFetchThreads, pDnode->opt.numOfCores); + int32_t minQueryThreads = MAX((int32_t)(pDnode->opt.numOfCores * pDnode->opt.ratioOfQueryCores), 1); + int32_t maxQueryThreads = minQueryThreads; + int32_t maxWriteThreads = MAX(pDnode->opt.numOfCores, 1); + int32_t maxSyncThreads = MAX(pDnode->opt.numOfCores / 2, 1); SWorkerPool *pPool = &pMgmt->queryPool; pPool->name = "vnode-query"; - pPool->min = (int32_t)threadsForQuery; - pPool->max = pPool->min; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + pPool->min = minQueryThreads; + pPool->max = maxQueryThreads; + if (tWorkerInit(pPool) != 0) return -1; pPool = &pMgmt->fetchPool; pPool->name = "vnode-fetch"; - pPool->min = MIN(maxFetchThreads, pDnode->opt.numOfCores); - pPool->max = pPool->min; - if (tWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } + pPool->min = minFetchThreads; + pPool->max = maxFetchThreads; + if (tWorkerInit(pPool) != 0) return -1; - dDebug("vnode read worker is initialized"); + SMWorkerPool *pMPool = &pMgmt->writePool; + pMPool->name = "vnode-write"; + pMPool->max = maxWriteThreads; + if (tMWorkerInit(pMPool) != 0) return -1; + + pMPool = &pMgmt->syncPool; + pMPool->name = "vnode-sync"; + pMPool->max = maxSyncThreads; + if (tMWorkerInit(pMPool) != 0) return -1; + + dDebug("vnode workers is initialized"); return 0; } -static void dndCleanupVnodeReadWorker(SDnode *pDnode) { +static void dndCleanupVnodeWorkers(SDnode *pDnode) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; tWorkerCleanup(&pMgmt->fetchPool); tWorkerCleanup(&pMgmt->queryPool); - dDebug("vnode close worker is initialized"); -} - -static int32_t dndAllocVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); - if (pVnode->pWriteQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeWriteQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); - pVnode->pWriteQ = NULL; -} - -static int32_t dndAllocVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); - if (pVnode->pApplyQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeApplyQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); - pVnode->pApplyQ = NULL; -} - -static int32_t dndInitVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->writePool; - pPool->name = "vnode-write"; - pPool->max = pDnode->opt.numOfCores; - if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode write worker is initialized"); - return 0; -} - -static void dndCleanupVnodeWriteWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->writePool); - dDebug("vnode write worker is closed"); -} - -static int32_t dndAllocVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); - if (pVnode->pSyncQ == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - return 0; -} - -static void dndFreeVnodeSyncQueue(SDnode *pDnode, SVnodeObj *pVnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); - pVnode->pSyncQ = NULL; -} - -static int32_t dndInitVnodeSyncWorker(SDnode *pDnode) { - int32_t maxThreads = pDnode->opt.numOfCores / 2; - if (maxThreads < 1) maxThreads = 1; - - SVnodesMgmt *pMgmt = &pDnode->vmgmt; - SMWorkerPool *pPool = &pMgmt->syncPool; - pPool->name = "vnode-sync"; - pPool->max = maxThreads; - if (tMWorkerInit(pPool) != 0) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - dDebug("vnode sync worker is initialized"); - return 0; -} - -static void dndCleanupVnodeSyncWorker(SDnode *pDnode) { - SVnodesMgmt *pMgmt = &pDnode->vmgmt; tMWorkerCleanup(&pMgmt->syncPool); - dDebug("vnode sync worker is closed"); + dDebug("vnode workers is closed"); +} + +static int32_t dndAllocVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { + SVnodesMgmt *pMgmt = &pDnode->vmgmt; + + pVnode->pWriteQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeWriteQueue); + pVnode->pApplyQ = tMWorkerAllocQueue(&pMgmt->writePool, pVnode, (FProcessItems)dndProcessVnodeApplyQueue); + pVnode->pSyncQ = tMWorkerAllocQueue(&pMgmt->syncPool, pVnode, (FProcessItems)dndProcessVnodeSyncQueue); + pVnode->pFetchQ = tWorkerAllocQueue(&pMgmt->fetchPool, pVnode, (FProcessItem)dndProcessVnodeFetchQueue); + pVnode->pQueryQ = tWorkerAllocQueue(&pMgmt->queryPool, pVnode, (FProcessItem)dndProcessVnodeQueryQueue); + + if (pVnode->pApplyQ == NULL || pVnode->pWriteQ == NULL || pVnode->pSyncQ == NULL || pVnode->pFetchQ == NULL || + pVnode->pQueryQ == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + return 0; +} + +static void dndFreeVnodeQueue(SDnode *pDnode, SVnodeObj *pVnode) { + SVnodesMgmt *pMgmt = &pDnode->vmgmt; + tWorkerFreeQueue(&pMgmt->queryPool, pVnode->pQueryQ); + tWorkerFreeQueue(&pMgmt->fetchPool, pVnode->pFetchQ); + tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pWriteQ); + tMWorkerFreeQueue(&pMgmt->writePool, pVnode->pApplyQ); + tMWorkerFreeQueue(&pMgmt->syncPool, pVnode->pSyncQ); + pVnode->pWriteQ = NULL; + pVnode->pApplyQ = NULL; + pVnode->pSyncQ = NULL; + pVnode->pFetchQ = NULL; + pVnode->pQueryQ = NULL; } int32_t dndInitVnodes(SDnode *pDnode) { dInfo("dnode-vnodes start to init"); - if (dndInitVnodeReadWorker(pDnode) != 0) { - dError("failed to init vnodes read worker since %s", terrstr()); - return -1; - } - - if (dndInitVnodeWriteWorker(pDnode) != 0) { - dError("failed to init vnodes write worker since %s", terrstr()); - return -1; - } - - if (dndInitVnodeSyncWorker(pDnode) != 0) { - dError("failed to init vnodes sync worker since %s", terrstr()); + if (dndInitVnodeWorkers(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + dError("failed to init vnode workers since %s", terrstr()); return -1; } @@ -1112,9 +982,7 @@ int32_t dndInitVnodes(SDnode *pDnode) { void dndCleanupVnodes(SDnode *pDnode) { dInfo("dnode-vnodes start to clean up"); dndCloseVnodes(pDnode); - dndCleanupVnodeReadWorker(pDnode); - dndCleanupVnodeWriteWorker(pDnode); - dndCleanupVnodeSyncWorker(pDnode); + dndCleanupVnodeWorkers(pDnode); dInfo("dnode-vnodes is cleaned up"); } diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index c87749203a..88175646ae 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -20,12 +20,13 @@ static void vnodeFree(SVnode *pVnode); static int vnodeOpenImpl(SVnode *pVnode); static void vnodeCloseImpl(SVnode *pVnode); -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfgInput) { +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - SVnodeCfg *pVnodeCfg = &defaultVnodeOptions; - pVnodeCfg->vgId = pVnodeCfg->vgId; + if (pVnodeCfg == NULL) { + pVnodeCfg = &defaultVnodeOptions; + } // Validate options if (vnodeValidateOptions(pVnodeCfg) < 0) { From 0fb70ce2cae1be55476ea9b2f2a07ea791d57952 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sat, 8 Jan 2022 01:12:11 -0800 Subject: [PATCH 6/7] minor changes --- source/dnode/mgmt/impl/src/dndVnodes.c | 2 -- 1 file changed, 2 deletions(-) diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 6d90da8b31..5198e351ab 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -475,7 +475,6 @@ static void dndCloseVnodes(SDnode *pDnode) { SVnodeObj **pVnodes = dndGetVnodesFromHash(pDnode, &numOfVnodes); for (int32_t i = 0; i < numOfVnodes; ++i) { - dndReleaseVnode(pDnode, pVnodes[i]); dndCloseVnode(pDnode, pVnodes[i]); } @@ -660,7 +659,6 @@ int32_t dndProcessDropVnodeReq(SDnode *pDnode, SRpcMsg *pReq) { return -1; } - dndReleaseVnode(pDnode, pVnode); dndCloseVnode(pDnode, pVnode); vnodeClose(pVnode->pImpl); vnodeDestroy(pVnode->path); From 371432c14333e42532a3a997d4aaea525fa0b093 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 9 Jan 2022 18:49:48 -0800 Subject: [PATCH 7/7] minor changes --- source/dnode/vnode/impl/src/vnodeMain.c | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/source/dnode/vnode/impl/src/vnodeMain.c b/source/dnode/vnode/impl/src/vnodeMain.c index 88175646ae..995bed6e0b 100644 --- a/source/dnode/vnode/impl/src/vnodeMain.c +++ b/source/dnode/vnode/impl/src/vnodeMain.c @@ -24,9 +24,9 @@ SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg) { SVnode *pVnode = NULL; // Set default options - if (pVnodeCfg == NULL) { + //if (pVnodeCfg == NULL) { pVnodeCfg = &defaultVnodeOptions; - } + //} // Validate options if (vnodeValidateOptions(pVnodeCfg) < 0) {