From 34619844bf668f3af7f77d069f43973ef707a2d9 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Thu, 23 Dec 2021 01:47:21 -0800 Subject: [PATCH] fix bug while exec trans --- .clang-format | 1 + include/common/taosmsg.h | 7 - source/dnode/mgmt/impl/src/dndMnode.c | 41 ++- source/dnode/mgmt/impl/test/mnode/mnode.cpp | 342 +++++++++++--------- source/dnode/mnode/impl/src/mndMnode.c | 21 +- source/dnode/mnode/impl/src/mndTrans.c | 2 +- source/dnode/mnode/impl/src/mnode.c | 2 + source/dnode/mnode/sdb/src/sdb.c | 35 +- source/dnode/mnode/sdb/src/sdbFile.c | 28 +- 9 files changed, 263 insertions(+), 216 deletions(-) diff --git a/.clang-format b/.clang-format index 3ddd8b43f6..f60fd3cb26 100644 --- a/.clang-format +++ b/.clang-format @@ -86,5 +86,6 @@ SpacesInSquareBrackets: false Standard: Auto TabWidth: 8 UseTab: Never +AlignConsecutiveDeclarations: true ... diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index c7a521413f..b3f997ecd9 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -909,36 +909,29 @@ typedef struct SShowRsp { typedef struct { char ep[TSDB_EP_LEN]; // end point, hostname:port - int32_t reserve[8]; } SCreateDnodeMsg; typedef struct { int32_t dnodeId; - int32_t reserve[8]; } SDropDnodeMsg; typedef struct { int32_t dnodeId; char config[TSDB_DNODE_CONFIG_LEN]; - int32_t reserve[8]; } SCfgDnodeMsg; typedef struct { int32_t dnodeId; - int32_t reserve[8]; } SCreateMnodeMsg, SDropMnodeMsg; typedef struct { int32_t dnodeId; - int8_t align[3]; int8_t replica; SReplica replicas[TSDB_MAX_REPLICA]; - int32_t reserve[8]; } SCreateMnodeInMsg, SAlterMnodeInMsg; typedef struct { int32_t dnodeId; - int32_t reserve[8]; } SDropMnodeInMsg; typedef struct { diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 3cf08e619e..4fd6b021c8 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -349,7 +349,7 @@ static void dndBuildMnodeDeployOption(SDnode *pDnode, SMnodeOpt *pOption) { SReplica *pReplica = &pOption->replicas[0]; pReplica->id = 1; pReplica->port = pDnode->opt.serverPort; - tstrncpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); + memcpy(pReplica->fqdn, pDnode->opt.localFqdn, TSDB_FQDN_LEN); SMnodeMgmt *pMgmt = &pDnode->mmgmt; pMgmt->selfIndex = pOption->selfIndex; @@ -376,7 +376,7 @@ static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SC SReplica *pReplica = &pOption->replicas[i]; pReplica->id = pMsg->replicas[i].id; pReplica->port = pMsg->replicas[i].port; - tstrncpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); + memcpy(pReplica->fqdn, pMsg->replicas[i].fqdn, TSDB_FQDN_LEN); if (pReplica->id == pOption->dnodeId) { pOption->selfIndex = i; } @@ -499,7 +499,7 @@ static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { } static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); + SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -515,18 +515,23 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); + SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; return -1; - } else { - SMnodeOpt option = {0}; - if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { - return -1; - } - return dndAlterMnode(pDnode, &option); } + + SMnodeOpt option = {0}; + if (dndBuildMnodeOptionFromMsg(pDnode, &option, pMsg) != 0) { + return -1; + } + + if (dndAlterMnode(pDnode, &option) != 0) { + return -1; + } + + return dndWriteMnodeFile(pDnode); } static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { @@ -555,16 +560,17 @@ static void dndProcessMnodeMgmtQueue(SDnode *pDnode, SRpcMsg *pMsg) { code = dndProcessDropMnodeReq(pDnode, pMsg); break; default: - code = TSDB_CODE_MSG_NOT_PROCESSED; + terrno = TSDB_CODE_MSG_NOT_PROCESSED; + code = -1; break; } if (pMsg->msgType & 1u) { + if (code != 0) code = terrno; SRpcMsg rsp = {.code = code, .handle = pMsg->handle}; rpcSendResponse(&rsp); } rpcFreeCont(pMsg->pCont); - pMsg->pCont = NULL; taosFreeQitem(pMsg); } @@ -625,8 +631,6 @@ static void dndProcessMnodeSyncQueue(SDnode *pDnode, SMnodeMsg *pMsg) { } static int32_t dndWriteMnodeMsgToQueue(SMnode *pMnode, taos_queue pQueue, SRpcMsg *pRpcMsg) { - assert(pQueue); - SMnodeMsg *pMsg = mndInitMsg(pMnode, pRpcMsg); if (pMsg == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; @@ -647,13 +651,14 @@ void dndProcessMnodeMgmtMsg(SDnode *pDnode, SRpcMsg *pRpcMsg, SEpSet *pEpSet) { SMnode *pMnode = dndAcquireMnode(pDnode); SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg)); + if (pMsg != NULL) *pMsg = *pRpcMsg; + if (pMsg == NULL || taosWriteQitem(pMgmt->pMgmtQ, pMsg) != 0) { if (pRpcMsg->msgType & 1u) { SRpcMsg rsp = {.handle = pRpcMsg->handle, .code = TSDB_CODE_OUT_OF_MEMORY}; rpcSendResponse(&rsp); } rpcFreeCont(pRpcMsg->pCont); - pRpcMsg->pCont = NULL; taosFreeQitem(pMsg); } } @@ -894,6 +899,11 @@ int32_t dndInitMnode(SDnode *pDnode) { return -1; } + if (dndAllocMnodeMgmtQueue(pDnode) != 0) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + char path[PATH_MAX]; snprintf(path, PATH_MAX, "%s/mnode.json", pDnode->dir.dnode); pMgmt->file = strdup(path); @@ -937,6 +947,7 @@ void dndCleanupMnode(SDnode *pDnode) { dInfo("dnode-mnode start to clean up"); dndStopMnodeWorker(pDnode); dndCleanupMnodeMgmtWorker(pDnode); + dndFreeMnodeMgmtQueue(pDnode); tfree(pMgmt->file); mndClose(pMgmt->pMnode); dInfo("dnode-mnode is cleaned up"); diff --git a/source/dnode/mgmt/impl/test/mnode/mnode.cpp b/source/dnode/mgmt/impl/test/mnode/mnode.cpp index cc0d25edea..6f66a0b12f 100644 --- a/source/dnode/mgmt/impl/test/mnode/mnode.cpp +++ b/source/dnode/mgmt/impl/test/mnode/mnode.cpp @@ -70,21 +70,35 @@ TEST_F(DndTestMnode, 01_ShowDnode) { CheckTimestamp(); } -#if 0 -TEST_F(DndTestMnode, 02_ConfigDnode) { - int32_t contLen = sizeof(SCfgDnodeMsg); +TEST_F(DndTestMnode, 02_Create_Mnode_Invalid_Id) { + { + int32_t contLen = sizeof(SCreateMnodeMsg); - SCfgDnodeMsg* pReq = (SCfgDnodeMsg*)rpcMallocCont(contLen); - pReq->dnodeId = htonl(1); - strcpy(pReq->config, "ddebugflag 131"); + SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(1); - SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CONFIG_DNODE, pReq, contLen); - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_MNODE_ALREADY_EXIST); + } } -TEST_F(DndTestMnode, 03_Create_Drop_Restart_Dnode) { +TEST_F(DndTestMnode, 03_Create_Mnode_Invalid_Id) { { + int32_t contLen = sizeof(SCreateMnodeMsg); + + SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); + pReq->dnodeId = htonl(2); + + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen); + ASSERT_NE(pMsg, nullptr); + ASSERT_EQ(pMsg->code, TSDB_CODE_MND_DNODE_NOT_EXIST); + } +} + +TEST_F(DndTestMnode, 04_Create_Mnode) { + { + // create dnode int32_t contLen = sizeof(SCreateDnodeMsg); SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); @@ -93,164 +107,172 @@ TEST_F(DndTestMnode, 03_Create_Drop_Restart_Dnode) { SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); + + taosMsleep(1300); + test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 2); } - taosMsleep(1300); - - test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); - CHECK_META("show dnodes", 7); - test.SendShowRetrieveMsg(); - EXPECT_EQ(test.GetShowRows(), 2); - - CheckInt16(1); - CheckInt16(2); - CheckBinary("localhost:9061", TSDB_EP_LEN); - CheckBinary("localhost:9062", TSDB_EP_LEN); - CheckInt16(0); - CheckInt16(0); - CheckInt16(1); - CheckInt16(1); - CheckBinary("ready", 10); - CheckBinary("ready", 10); - CheckTimestamp(); - CheckTimestamp(); - CheckBinary("", 24); - CheckBinary("", 24); - { - int32_t contLen = sizeof(SDropDnodeMsg); + // create mnode + int32_t contLen = sizeof(SCreateMnodeMsg); - SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen); + SCreateMnodeMsg* pReq = (SCreateMnodeMsg*)rpcMallocCont(contLen); pReq->dnodeId = htonl(2); - SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen); + SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_MNODE, pReq, contLen); ASSERT_NE(pMsg, nullptr); ASSERT_EQ(pMsg->code, 0); + + test.SendShowMetaMsg(TSDB_MGMT_TABLE_MNODE, ""); + test.SendShowRetrieveMsg(); + EXPECT_EQ(test.GetShowRows(), 2); + + CheckInt16(1); + CheckInt16(2); + CheckBinary("localhost:9061", TSDB_EP_LEN); + CheckBinary("localhost:9062", TSDB_EP_LEN); + CheckBinary("master", 12); + CheckBinary("slave", 12); + CheckInt64(0); + CheckInt64(0); + CheckTimestamp(); + CheckTimestamp(); } - - test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); - CHECK_META("show dnodes", 7); - test.SendShowRetrieveMsg(); - EXPECT_EQ(test.GetShowRows(), 1); - - CheckInt16(1); - CheckBinary("localhost:9061", TSDB_EP_LEN); - CheckInt16(0); - CheckInt16(1); - CheckBinary("ready", 10); - CheckTimestamp(); - CheckBinary("", 24); - - { - int32_t contLen = sizeof(SCreateDnodeMsg); - - SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9063"); - - SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - } - - { - int32_t contLen = sizeof(SCreateDnodeMsg); - - SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9064"); - - SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - } - - { - int32_t contLen = sizeof(SCreateDnodeMsg); - - SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); - strcpy(pReq->ep, "localhost:9065"); - - SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); - ASSERT_NE(pMsg, nullptr); - ASSERT_EQ(pMsg->code, 0); - } - - taosMsleep(1300); - test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); - CHECK_META("show dnodes", 7); - test.SendShowRetrieveMsg(); - EXPECT_EQ(test.GetShowRows(), 4); - - CheckInt16(1); - CheckInt16(3); - CheckInt16(4); - CheckInt16(5); - CheckBinary("localhost:9061", TSDB_EP_LEN); - CheckBinary("localhost:9063", TSDB_EP_LEN); - CheckBinary("localhost:9064", TSDB_EP_LEN); - CheckBinary("localhost:9065", TSDB_EP_LEN); - CheckInt16(0); - CheckInt16(0); - CheckInt16(0); - CheckInt16(0); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); - CheckBinary("ready", 10); - CheckBinary("ready", 10); - CheckBinary("ready", 10); - CheckBinary("ready", 10); - CheckTimestamp(); - CheckTimestamp(); - CheckTimestamp(); - CheckTimestamp(); - CheckBinary("", 24); - CheckBinary("", 24); - CheckBinary("", 24); - CheckBinary("", 24); - - // restart - uInfo("stop all server"); - test.Restart(); - server2.Restart(); - server3.Restart(); - server4.Restart(); - server5.Restart(); - - taosMsleep(1300); - test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); - CHECK_META("show dnodes", 7); - test.SendShowRetrieveMsg(); - EXPECT_EQ(test.GetShowRows(), 4); - - CheckInt16(1); - CheckInt16(3); - CheckInt16(4); - CheckInt16(5); - CheckBinary("localhost:9061", TSDB_EP_LEN); - CheckBinary("localhost:9063", TSDB_EP_LEN); - CheckBinary("localhost:9064", TSDB_EP_LEN); - CheckBinary("localhost:9065", TSDB_EP_LEN); - CheckInt16(0); - CheckInt16(0); - CheckInt16(0); - CheckInt16(0); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); - CheckInt16(1); - CheckBinary("ready", 10); - CheckBinary("ready", 10); - CheckBinary("ready", 10); - CheckBinary("ready", 10); - CheckTimestamp(); - CheckTimestamp(); - CheckTimestamp(); - CheckTimestamp(); - CheckBinary("", 24); - CheckBinary("", 24); - CheckBinary("", 24); - CheckBinary("", 24); } +// { +// int32_t contLen = sizeof(SDropDnodeMsg); -#endif \ No newline at end of file +// SDropDnodeMsg* pReq = (SDropDnodeMsg*)rpcMallocCont(contLen); +// pReq->dnodeId = htonl(2); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_DROP_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); +// CHECK_META("show dnodes", 7); +// test.SendShowRetrieveMsg(); +// EXPECT_EQ(test.GetShowRows(), 1); + +// CheckInt16(1); +// CheckBinary("localhost:9061", TSDB_EP_LEN); +// CheckInt16(0); +// CheckInt16(1); +// CheckBinary("ready", 10); +// CheckTimestamp(); +// CheckBinary("", 24); + +// { +// int32_t contLen = sizeof(SCreateDnodeMsg); + +// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); +// strcpy(pReq->ep, "localhost:9063"); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// { +// int32_t contLen = sizeof(SCreateDnodeMsg); + +// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); +// strcpy(pReq->ep, "localhost:9064"); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// { +// int32_t contLen = sizeof(SCreateDnodeMsg); + +// SCreateDnodeMsg* pReq = (SCreateDnodeMsg*)rpcMallocCont(contLen); +// strcpy(pReq->ep, "localhost:9065"); + +// SRpcMsg* pMsg = test.SendMsg(TSDB_MSG_TYPE_CREATE_DNODE, pReq, contLen); +// ASSERT_NE(pMsg, nullptr); +// ASSERT_EQ(pMsg->code, 0); +// } + +// taosMsleep(1300); +// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); +// CHECK_META("show dnodes", 7); +// test.SendShowRetrieveMsg(); +// EXPECT_EQ(test.GetShowRows(), 4); + +// CheckInt16(1); +// CheckInt16(3); +// CheckInt16(4); +// CheckInt16(5); +// CheckBinary("localhost:9061", TSDB_EP_LEN); +// CheckBinary("localhost:9063", TSDB_EP_LEN); +// CheckBinary("localhost:9064", TSDB_EP_LEN); +// CheckBinary("localhost:9065", TSDB_EP_LEN); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); + +// // restart +// uInfo("stop all server"); +// test.Restart(); +// server2.Restart(); +// server3.Restart(); +// server4.Restart(); +// server5.Restart(); + +// taosMsleep(1300); +// test.SendShowMetaMsg(TSDB_MGMT_TABLE_DNODE, ""); +// CHECK_META("show dnodes", 7); +// test.SendShowRetrieveMsg(); +// EXPECT_EQ(test.GetShowRows(), 4); + +// CheckInt16(1); +// CheckInt16(3); +// CheckInt16(4); +// CheckInt16(5); +// CheckBinary("localhost:9061", TSDB_EP_LEN); +// CheckBinary("localhost:9063", TSDB_EP_LEN); +// CheckBinary("localhost:9064", TSDB_EP_LEN); +// CheckBinary("localhost:9065", TSDB_EP_LEN); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(0); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckInt16(1); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckBinary("ready", 10); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckTimestamp(); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); +// CheckBinary("", 24); +// } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 80b4fabca9..37c9ac513a 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -31,6 +31,7 @@ static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg); static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg); static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); @@ -49,6 +50,7 @@ int32_t mndInitMnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeReq); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeReq); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP, mndProcessCreateMnodeRsp); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_ALTER_MNODE_IN_RSP, mndProcessAlterMnodeRsp); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE_IN_RSP, mndProcessDropMnodeRsp); mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndGetMnodeMeta); @@ -270,6 +272,8 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno memcpy(pReplica->fqdn, pDnode->fqdn, TSDB_FQDN_LEN); numOfReplicas++; + createMsg.replica = numOfReplicas; + while (1) { SMnodeObj *pMObj = NULL; pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); @@ -382,7 +386,7 @@ static int32_t mndProcessCreateMnodeReq(SMnodeMsg *pMsg) { SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); if (pDnode == NULL) { - mError("mnode:%d, dnode not exist", pDnode->id); + mError("mnode:%d, dnode not exist", pCreate->dnodeId); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; return -1; } @@ -560,9 +564,20 @@ static int32_t mndProcessDropMnodeReq(SMnodeMsg *pMsg) { return TSDB_CODE_MND_ACTION_IN_PROGRESS; } -static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} -static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { return 0; } +static int32_t mndProcessAlterMnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} + +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { + mndTransHandleActionRsp(pMsg); + return 0; +} static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { SMnode *pMnode = pMsg->pMnode; diff --git a/source/dnode/mnode/impl/src/mndTrans.c b/source/dnode/mnode/impl/src/mndTrans.c index 54cd6ab501..504717117b 100644 --- a/source/dnode/mnode/impl/src/mndTrans.c +++ b/source/dnode/mnode/impl/src/mndTrans.c @@ -622,7 +622,7 @@ void mndTransHandleActionRsp(SMnodeMsg *pMsg) { STransAction *pAction = taosArrayGet(pArray, action); if (pAction != NULL) { pAction->msgReceived = 1; - pAction->errCode = pMsg->code; + pAction->errCode = pMsg->rpcMsg.code; } mDebug("trans:%d, action:%d response is received, code:0x%x", transId, action, pMsg->code); diff --git a/source/dnode/mnode/impl/src/mnode.c b/source/dnode/mnode/impl/src/mnode.c index fb0b95dc4a..a62a0a9296 100644 --- a/source/dnode/mnode/impl/src/mnode.c +++ b/source/dnode/mnode/impl/src/mnode.c @@ -178,8 +178,10 @@ static int32_t mndExecSteps(SMnode *pMnode) { // (*pMnode->reportProgress)(pStep->name, "start initialize"); if ((*pStep->initFp)(pMnode) != 0) { + int32_t code = terrno; mError("step:%s exec failed since %s, start to cleanup", pStep->name, terrstr()); mndCleanupSteps(pMnode, pos); + terrno = code; return -1; } else { mDebug("step:%s is initialized", pStep->name); diff --git a/source/dnode/mnode/sdb/src/sdb.c b/source/dnode/mnode/sdb/src/sdb.c index 77614e399e..bb0e606463 100644 --- a/source/dnode/mnode/sdb/src/sdb.c +++ b/source/dnode/mnode/sdb/src/sdb.c @@ -16,6 +16,8 @@ #define _DEFAULT_SOURCE #include "sdbInt.h" +static int32_t sdbCreateDir(SSdb *pSdb); + SSdb *sdbInit(SSdbOpt *pOption) { mDebug("start to init sdb in %s", pOption->path); @@ -40,6 +42,11 @@ SSdb *sdbInit(SSdbOpt *pOption) { return NULL; } + if (sdbCreateDir(pSdb) != 0) { + sdbCleanup(pSdb); + return NULL; + } + for (ESdbType i = 0; i < SDB_MAX; ++i) { taosInitRWLatch(&pSdb->locks[i]); } @@ -53,8 +60,8 @@ void sdbCleanup(SSdb *pSdb) { mDebug("start to cleanup sdb"); // if (pSdb->curVer != pSdb->lastCommitVer) { - mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); - sdbWriteFile(pSdb); + mDebug("write sdb file for curVer:% " PRId64 " and lastVer:%" PRId64, pSdb->curVer, pSdb->lastCommitVer); + sdbWriteFile(pSdb); // } if (pSdb->currDir != NULL) { @@ -133,4 +140,26 @@ int32_t sdbSetTable(SSdb *pSdb, SSdbTable table) { mDebug("sdb table:%d is initialized", sdbType); return 0; -} \ No newline at end of file +} + +static int32_t sdbCreateDir(SSdb *pSdb) { + if (taosMkDir(pSdb->currDir) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", pSdb->currDir, terrstr()); + return -1; + } + + if (taosMkDir(pSdb->syncDir) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr()); + return -1; + } + + if (taosMkDir(pSdb->tmpDir) != 0) { + terrno = TAOS_SYSTEM_ERROR(errno); + mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr()); + return -1; + } + + return 0; +} diff --git a/source/dnode/mnode/sdb/src/sdbFile.c b/source/dnode/mnode/sdb/src/sdbFile.c index af37e9e1d5..7828e39e56 100644 --- a/source/dnode/mnode/sdb/src/sdbFile.c +++ b/source/dnode/mnode/sdb/src/sdbFile.c @@ -17,28 +17,6 @@ #include "sdbInt.h" #include "tchecksum.h" -static int32_t sdbCreateDir(SSdb *pSdb) { - if (taosMkDir(pSdb->currDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->currDir, terrstr()); - return -1; - } - - if (taosMkDir(pSdb->syncDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->syncDir, terrstr()); - return -1; - } - - if (taosMkDir(pSdb->tmpDir) != 0) { - terrno = TAOS_SYSTEM_ERROR(errno); - mError("failed to create dir:%s since %s", pSdb->tmpDir, terrstr()); - return -1; - } - - return 0; -} - static int32_t sdbRunDeployFp(SSdb *pSdb) { mDebug("start to deploy sdb"); @@ -77,7 +55,7 @@ int32_t sdbReadFile(SSdb *pSdb) { free(pRaw); terrno = TAOS_SYSTEM_ERROR(errno); mError("failed to read file:%s since %s", file, terrstr()); - return -1; + return 0; } while (1) { @@ -225,10 +203,6 @@ int32_t sdbWriteFile(SSdb *pSdb) { } int32_t sdbDeploy(SSdb *pSdb) { - if (sdbCreateDir(pSdb) != 0) { - return -1; - } - if (sdbRunDeployFp(pSdb) != 0) { return -1; }