feat[cluster]: create and drop mnode

This commit is contained in:
Shengliang Guan 2022-04-09 19:00:52 +08:00
parent a0460538ac
commit c1b3afa92d
15 changed files with 121 additions and 97 deletions

View File

@ -195,6 +195,9 @@ int32_t* taosGetErrno();
#define TSDB_CODE_MND_SNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0355) #define TSDB_CODE_MND_SNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0355)
#define TSDB_CODE_MND_BNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0356) #define TSDB_CODE_MND_BNODE_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0356)
#define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357) #define TSDB_CODE_MND_BNODE_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x0357)
#define TSDB_CODE_MND_TOO_FEW_MNODES TAOS_DEF_ERROR_CODE(0, 0x0358)
#define TSDB_CODE_MND_MNODE_DEPLOYED TAOS_DEF_ERROR_CODE(0, 0x0359)
#define TSDB_CODE_MND_CANT_DROP_MASTER TAOS_DEF_ERROR_CODE(0, 0x035A)
// mnode-acct // mnode-acct
#define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360) #define TSDB_CODE_MND_ACCT_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0360)

View File

@ -62,6 +62,7 @@ int32_t mmProcessSyncMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessReadMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg); int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg);
int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToQueryQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToReadQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);
int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc); int32_t mmPutMsgToWriteQueue(SMgmtWrapper *pWrapper, SRpcMsg *pRpc);

View File

@ -48,15 +48,13 @@ static void mmInitOption(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
} }
static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) { static void mmBuildOptionForDeploy(SMnodeMgmt *pMgmt, SMnodeOpt *pOption) {
SDnode *pDnode = pMgmt->pDnode;
mmInitOption(pMgmt, pOption); mmInitOption(pMgmt, pOption);
pOption->replica = 1; pOption->replica = 1;
pOption->selfIndex = 0; pOption->selfIndex = 0;
SReplica *pReplica = &pOption->replicas[0]; SReplica *pReplica = &pOption->replicas[0];
pReplica->id = 1; pReplica->id = 1;
pReplica->port = pDnode->serverPort; pReplica->port = pMgmt->pDnode->serverPort;
tstrncpy(pReplica->fqdn, pDnode->localFqdn, TSDB_FQDN_LEN); tstrncpy(pReplica->fqdn, pMgmt->pDnode->localFqdn, TSDB_FQDN_LEN);
pOption->deploy = true; pOption->deploy = true;
pMgmt->selfIndex = pOption->selfIndex; pMgmt->selfIndex = pOption->selfIndex;
@ -151,12 +149,9 @@ static void mmCloseImp(SMnodeMgmt *pMgmt) {
int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) { int32_t mmAlter(SMnodeMgmt *pMgmt, SDAlterMnodeReq *pReq) {
SMnodeOpt option = {0}; SMnodeOpt option = {0};
if (pReq != NULL) { if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) {
if (mmBuildOptionFromReq(pMgmt, &option, pReq) != 0) { return -1;
return -1;
}
} }
return mndAlter(pMgmt->pMnode, &option); return mndAlter(pMgmt->pMnode, &option);
} }
@ -240,4 +235,3 @@ void mmSetMgmtFp(SMgmtWrapper *pWrapper) {
pWrapper->name = "mnode"; pWrapper->name = "mnode";
pWrapper->fp = mgmtFp; pWrapper->fp = mgmtFp;
} }

View File

@ -16,6 +16,15 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "mmInt.h" #include "mmInt.h"
static inline void mmSendRsp(SNodeMsg *pMsg, int32_t code) {
SRpcMsg rsp = {.handle = pMsg->rpcMsg.handle,
.ahandle = pMsg->rpcMsg.ahandle,
.code = code,
.pCont = pMsg->pRsp,
.contLen = pMsg->rspLen};
tmsgSendRsp(&rsp);
}
static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
@ -35,8 +44,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (pRpc->handle != NULL && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
if (code != 0 && terrno != 0) code = terrno; if (code != 0 && terrno != 0) code = terrno;
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .contLen = pMsg->rspLen, .pCont = pMsg->pRsp}; mmSendRsp(pMsg, code);
tmsgSendRsp(&rsp);
} }
} }
@ -48,7 +56,7 @@ static void mmProcessQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) { static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pInfo->ahandle; SMnodeMgmt *pMgmt = pInfo->ahandle;
dTrace("msg:%p, get from mnode query queue", pMsg); dTrace("msg:%p, get from mnode-query queue", pMsg);
SRpcMsg *pRpc = &pMsg->rpcMsg; SRpcMsg *pRpc = &pMsg->rpcMsg;
int32_t code = -1; int32_t code = -1;
@ -58,8 +66,7 @@ static void mmProcessQueryQueue(SQueueInfo *pInfo, SNodeMsg *pMsg) {
if (pRpc->msgType & 1U) { if (pRpc->msgType & 1U) {
if (pRpc->handle != NULL && code != 0) { if (pRpc->handle != NULL && code != 0) {
dError("msg:%p, failed to process since %s", pMsg, terrstr()); dError("msg:%p, failed to process since %s", pMsg, terrstr());
SRpcMsg rsp = {.handle = pRpc->handle, .code = code, .ahandle = pRpc->ahandle}; mmSendRsp(pMsg, code);
tmsgSendRsp(&rsp);
} }
} }
@ -98,11 +105,8 @@ int32_t mmProcessQueryMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
} }
int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) { int32_t mmProcessMonitorMsg(SMgmtWrapper *pWrapper, SNodeMsg *pMsg) {
SMnodeMgmt *pMgmt = pWrapper->pMgmt; SMnodeMgmt *pMgmt = pWrapper->pMgmt;
SSingleWorker *pWorker = &pMgmt->monitorWorker; mmPutMsgToWorker(&pMgmt->monitorWorker, pMsg);
dTrace("msg:%p, put into worker:%s", pMsg, pWorker->name);
taosWriteQitem(pWorker->queue, pMsg);
return 0; return 0;
} }

View File

@ -100,6 +100,7 @@ typedef struct {
} SGrantInfo; } SGrantInfo;
typedef struct SMnode { typedef struct SMnode {
int32_t selfId;
int64_t clusterId; int64_t clusterId;
int8_t replica; int8_t replica;
int8_t selfIndex; int8_t selfIndex;

View File

@ -22,12 +22,14 @@
extern "C" { extern "C" {
#endif #endif
int32_t mndInitMnode(SMnode *pMnode); int32_t mndInitMnode(SMnode *pMnode);
void mndCleanupMnode(SMnode *pMnode); void mndCleanupMnode(SMnode *pMnode);
bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId);
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj);
char *mndGetRoleStr(int32_t role); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId);
void mndUpdateMnodeRole(SMnode *pMnode); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet);
void mndUpdateMnodeRole(SMnode *pMnode);
const char *mndGetRoleStr(int32_t role);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -396,7 +396,7 @@ static int32_t mndProcessDropBnodeReq(SNodeMsg *pReq) {
mDebug("bnode:%d, start to drop", dropReq.dnodeId); mDebug("bnode:%d, start to drop", dropReq.dnodeId);
if (dropReq.dnodeId <= 0) { if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR; terrno = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
} }

View File

@ -1040,7 +1040,7 @@ static void mndBuildDBVgroupInfo(SDbObj *pDb, SMnode *pMnode, SArray *pVgList) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL; void *pIter = NULL;
while (true) { while (1) {
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
if (pIter == NULL) break; if (pIter == NULL) break;

View File

@ -552,6 +552,7 @@ static int32_t mndProcessDropDnodeReq(SNodeMsg *pReq) {
int32_t code = -1; int32_t code = -1;
SUserObj *pUser = NULL; SUserObj *pUser = NULL;
SDnodeObj *pDnode = NULL; SDnodeObj *pDnode = NULL;
SMnodeObj *pMObj = NULL;
SMDropMnodeReq dropReq = {0}; SMDropMnodeReq dropReq = {0};
if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
@ -572,6 +573,12 @@ static int32_t mndProcessDropDnodeReq(SNodeMsg *pReq) {
goto DROP_DNODE_OVER; goto DROP_DNODE_OVER;
} }
pMObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pMObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_DEPLOYED;
goto DROP_DNODE_OVER;
}
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
@ -592,6 +599,7 @@ DROP_DNODE_OVER:
mndReleaseDnode(pMnode, pDnode); mndReleaseDnode(pMnode, pDnode);
mndReleaseUser(pMnode, pUser); mndReleaseUser(pMnode, pUser);
mndReleaseMnode(pMnode, pMObj);
return code; return code;
} }

View File

@ -21,8 +21,8 @@
#include "mndTrans.h" #include "mndTrans.h"
#include "mndUser.h" #include "mndUser.h"
#define TSDB_MNODE_VER_NUMBER 1 #define MNODE_VER_NUMBER 1
#define TSDB_MNODE_RESERVE_SIZE 64 #define MNODE_RESERVE_SIZE 64
static int32_t mndCreateDefaultMnode(SMnode *pMnode); static int32_t mndCreateDefaultMnode(SMnode *pMnode);
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj); static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj);
@ -64,21 +64,20 @@ int32_t mndInitMnode(SMnode *pMnode) {
void mndCleanupMnode(SMnode *pMnode) {} void mndCleanupMnode(SMnode *pMnode) {}
static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) {
SSdb *pSdb = pMnode->pSdb; SMnodeObj *pObj = sdbAcquire(pMnode->pSdb, SDB_MNODE, &mnodeId);
SMnodeObj *pObj = sdbAcquire(pSdb, SDB_MNODE, &mnodeId);
if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) { if (pObj == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_MNODE_NOT_EXIST; terrno = TSDB_CODE_MND_MNODE_NOT_EXIST;
} }
return pObj; return pObj;
} }
static void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) { void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pObj); sdbRelease(pMnode->pSdb, pObj);
} }
char *mndGetRoleStr(int32_t showType) { const char *mndGetRoleStr(int32_t showType) {
switch (showType) { switch (showType) {
case TAOS_SYNC_STATE_FOLLOWER: case TAOS_SYNC_STATE_FOLLOWER:
return "unsynced"; return "unsynced";
@ -130,18 +129,18 @@ static int32_t mndCreateDefaultMnode(SMnode *pMnode) {
static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) { static SSdbRaw *mndMnodeActionEncode(SMnodeObj *pObj) {
terrno = TSDB_CODE_OUT_OF_MEMORY; terrno = TSDB_CODE_OUT_OF_MEMORY;
SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, TSDB_MNODE_VER_NUMBER, sizeof(SMnodeObj) + TSDB_MNODE_RESERVE_SIZE); SSdbRaw *pRaw = sdbAllocRaw(SDB_MNODE, MNODE_VER_NUMBER, sizeof(SMnodeObj) + MNODE_RESERVE_SIZE);
if (pRaw == NULL) goto MNODE_ENCODE_OVER; if (pRaw == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_SET_INT32(pRaw, dataPos, pObj->id, MNODE_ENCODE_OVER) SDB_SET_INT32(pRaw, dataPos, pObj->id, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, MNODE_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pObj->createdTime, _OVER)
SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, MNODE_ENCODE_OVER) SDB_SET_INT64(pRaw, dataPos, pObj->updateTime, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE, MNODE_ENCODE_OVER) SDB_SET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
MNODE_ENCODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr()); mError("mnode:%d, failed to encode to raw:%p since %s", pObj->id, pRaw, terrstr());
sdbFreeRaw(pRaw); sdbFreeRaw(pRaw);
@ -158,26 +157,26 @@ static SSdbRow *mndMnodeActionDecode(SSdbRaw *pRaw) {
int8_t sver = 0; int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL; if (sdbGetRawSoftVer(pRaw, &sver) != 0) return NULL;
if (sver != TSDB_MNODE_VER_NUMBER) { if (sver != MNODE_VER_NUMBER) {
terrno = TSDB_CODE_SDB_INVALID_DATA_VER; terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto MNODE_DECODE_OVER; goto _OVER;
} }
SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj)); SSdbRow *pRow = sdbAllocRow(sizeof(SMnodeObj));
if (pRow == NULL) goto MNODE_DECODE_OVER; if (pRow == NULL) goto _OVER;
SMnodeObj *pObj = sdbGetRowObj(pRow); SMnodeObj *pObj = sdbGetRowObj(pRow);
if (pObj == NULL) goto MNODE_DECODE_OVER; if (pObj == NULL) goto _OVER;
int32_t dataPos = 0; int32_t dataPos = 0;
SDB_GET_INT32(pRaw, dataPos, &pObj->id, MNODE_DECODE_OVER) SDB_GET_INT32(pRaw, dataPos, &pObj->id, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, MNODE_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pObj->createdTime, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, MNODE_DECODE_OVER) SDB_GET_INT64(pRaw, dataPos, &pObj->updateTime, _OVER)
SDB_GET_RESERVE(pRaw, dataPos, TSDB_MNODE_RESERVE_SIZE, MNODE_DECODE_OVER) SDB_GET_RESERVE(pRaw, dataPos, MNODE_RESERVE_SIZE, _OVER)
terrno = 0; terrno = 0;
MNODE_DECODE_OVER: _OVER:
if (terrno != 0) { if (terrno != 0) {
mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr()); mError("mnode:%d, failed to decode from raw:%p since %s", pObj->id, pRaw, terrstr());
taosMemoryFreeClear(pRow); taosMemoryFreeClear(pRow);
@ -188,8 +187,6 @@ MNODE_DECODE_OVER:
return pRow; return pRow;
} }
static void mnodeResetMnode(SMnodeObj *pObj) { pObj->role = TAOS_SYNC_STATE_FOLLOWER; }
static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) { static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj); mTrace("mnode:%d, perform insert action, row:%p", pObj->id, pObj);
pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id); pObj->pDnode = sdbAcquire(pSdb, SDB_DNODE, &pObj->id);
@ -199,7 +196,7 @@ static int32_t mndMnodeActionInsert(SSdb *pSdb, SMnodeObj *pObj) {
return -1; return -1;
} }
mnodeResetMnode(pObj); pObj->role = TAOS_SYNC_STATE_FOLLOWER;
return 0; return 0;
} }
@ -233,7 +230,6 @@ bool mndIsMnode(SMnode *pMnode, int32_t dnodeId) {
void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
pEpSet->numOfEps = 0; pEpSet->numOfEps = 0;
void *pIter = NULL; void *pIter = NULL;
@ -241,14 +237,15 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) {
SMnodeObj *pObj = NULL; SMnodeObj *pObj = NULL;
pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj); pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pObj);
if (pIter == NULL) break; if (pIter == NULL) break;
if (pObj->pDnode == NULL) break; if (pObj->pDnode == NULL) {
mError("mnode:%d, no corresponding dnode exists", pObj->id);
if (pObj->role == TAOS_SYNC_STATE_LEADER) { } else {
pEpSet->inUse = pEpSet->numOfEps; if (pObj->role == TAOS_SYNC_STATE_LEADER) {
pEpSet->inUse = pEpSet->numOfEps;
}
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj);
} }
addEpIntoEpSet(pEpSet, pObj->pDnode->fqdn, pObj->pDnode->port);
sdbRelease(pSdb, pObj);
} }
} }
@ -364,18 +361,18 @@ static int32_t mndCreateMnode(SMnode *pMnode, SNodeMsg *pReq, SDnodeObj *pDnode,
mnodeObj.updateTime = mnodeObj.createdTime; mnodeObj.updateTime = mnodeObj.createdTime;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_CREATE_MNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto CREATE_MNODE_OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId); mDebug("trans:%d, used to create mnode:%d", pTrans->id, pCreate->dnodeId);
if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER; if (mndSetCreateMnodeRedoLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto CREATE_MNODE_OVER; if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeObj) != 0) goto _OVER;
if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto CREATE_MNODE_OVER; if (mndSetCreateMnodeRedoActions(pMnode, pTrans, pDnode, &mnodeObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto CREATE_MNODE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
CREATE_MNODE_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
@ -390,7 +387,7 @@ static int32_t mndProcessCreateMnodeReq(SNodeMsg *pReq) {
if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &createReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto CREATE_MNODE_OVER; goto _OVER;
} }
mDebug("mnode:%d, start to create", createReq.dnodeId); mDebug("mnode:%d, start to create", createReq.dnodeId);
@ -398,31 +395,31 @@ static int32_t mndProcessCreateMnodeReq(SNodeMsg *pReq) {
pObj = mndAcquireMnode(pMnode, createReq.dnodeId); pObj = mndAcquireMnode(pMnode, createReq.dnodeId);
if (pObj != NULL) { if (pObj != NULL) {
terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST;
goto CREATE_MNODE_OVER; goto _OVER;
} else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) { } else if (terrno != TSDB_CODE_MND_MNODE_NOT_EXIST) {
goto CREATE_MNODE_OVER; goto _OVER;
} }
pDnode = mndAcquireDnode(pMnode, createReq.dnodeId); pDnode = mndAcquireDnode(pMnode, createReq.dnodeId);
if (pDnode == NULL) { if (pDnode == NULL) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
goto CREATE_MNODE_OVER; goto _OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto CREATE_MNODE_OVER; goto _OVER;
} }
if (mndCheckNodeAuth(pUser)) { if (mndCheckNodeAuth(pUser)) {
goto CREATE_MNODE_OVER; goto _OVER;
} }
code = mndCreateMnode(pMnode, pReq, pDnode, &createReq); code = mndCreateMnode(pMnode, pReq, pDnode, &createReq);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
CREATE_MNODE_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr()); mError("mnode:%d, failed to create since %s", createReq.dnodeId, terrstr());
} }
@ -531,18 +528,18 @@ static int32_t mndDropMnode(SMnode *pMnode, SNodeMsg *pReq, SMnodeObj *pObj) {
int32_t code = -1; int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg); STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_TYPE_DROP_MNODE, &pReq->rpcMsg);
if (pTrans == NULL) goto DROP_MNODE_OVER; if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id); mDebug("trans:%d, used to drop mnode:%d", pTrans->id, pObj->id);
if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER; if (mndSetDropMnodeRedoLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto DROP_MNODE_OVER; if (mndSetDropMnodeCommitLogs(pMnode, pTrans, pObj) != 0) goto _OVER;
if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto DROP_MNODE_OVER; if (mndSetDropMnodeRedoActions(pMnode, pTrans, pObj->pDnode, pObj) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto DROP_MNODE_OVER; if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0; code = 0;
DROP_MNODE_OVER: _OVER:
mndTransDrop(pTrans); mndTransDrop(pTrans);
return code; return code;
} }
@ -556,35 +553,45 @@ static int32_t mndProcessDropMnodeReq(SNodeMsg *pReq) {
if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) { if (tDeserializeSCreateDropMQSBNodeReq(pReq->rpcMsg.pCont, pReq->rpcMsg.contLen, &dropReq) != 0) {
terrno = TSDB_CODE_INVALID_MSG; terrno = TSDB_CODE_INVALID_MSG;
goto DROP_MNODE_OVER; goto _OVER;
} }
mDebug("mnode:%d, start to drop", dropReq.dnodeId); mDebug("mnode:%d, start to drop", dropReq.dnodeId);
if (dropReq.dnodeId <= 0) { if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR; terrno = TSDB_CODE_INVALID_MSG;
goto DROP_MNODE_OVER; goto _OVER;
} }
pObj = mndAcquireMnode(pMnode, dropReq.dnodeId); pObj = mndAcquireMnode(pMnode, dropReq.dnodeId);
if (pObj == NULL) { if (pObj == NULL) {
goto DROP_MNODE_OVER; goto _OVER;
}
if (pMnode->selfId == dropReq.dnodeId) {
terrno = TSDB_CODE_MND_CANT_DROP_MASTER;
goto _OVER;
}
if (sdbGetSize(pMnode->pSdb, SDB_MNODE) <= 1) {
terrno = TSDB_CODE_MND_TOO_FEW_MNODES;
goto _OVER;
} }
pUser = mndAcquireUser(pMnode, pReq->user); pUser = mndAcquireUser(pMnode, pReq->user);
if (pUser == NULL) { if (pUser == NULL) {
terrno = TSDB_CODE_MND_NO_USER_FROM_CONN; terrno = TSDB_CODE_MND_NO_USER_FROM_CONN;
goto DROP_MNODE_OVER; goto _OVER;
} }
if (mndCheckNodeAuth(pUser)) { if (mndCheckNodeAuth(pUser)) {
goto DROP_MNODE_OVER; goto _OVER;
} }
code = mndDropMnode(pMnode, pReq, pObj); code = mndDropMnode(pMnode, pReq, pObj);
if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS; if (code == 0) code = TSDB_CODE_MND_ACTION_IN_PROGRESS;
DROP_MNODE_OVER: _OVER:
if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) { if (code != 0 && code != TSDB_CODE_MND_ACTION_IN_PROGRESS) {
mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr()); mError("mnode:%d, failed to drop since %s", dropReq.dnodeId, terrstr());
} }
@ -687,7 +694,7 @@ static int32_t mndRetrieveMnodes(SNodeMsg *pReq, SShowObj *pShow, char *data, in
cols++; cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
char *roles = mndGetRoleStr(pObj->role); const char *roles = mndGetRoleStr(pObj->role);
STR_WITH_MAXSIZE_TO_VARSTR(pWrite, roles, pShow->bytes[cols]); STR_WITH_MAXSIZE_TO_VARSTR(pWrite, roles, pShow->bytes[cols]);
cols++; cols++;

View File

@ -398,7 +398,7 @@ static int32_t mndProcessDropQnodeReq(SNodeMsg *pReq) {
mDebug("qnode:%d, start to drop", dropReq.dnodeId); mDebug("qnode:%d, start to drop", dropReq.dnodeId);
if (dropReq.dnodeId <= 0) { if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR; terrno = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
} }
@ -453,7 +453,7 @@ static int32_t mndProcessQnodeListReq(SNodeMsg *pReq) {
goto _OVER; goto _OVER;
} }
while (true) { while (1) {
void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj); void *pIter = sdbFetch(pSdb, SDB_QNODE, NULL, (void **)&pObj);
if (pIter == NULL) break; if (pIter == NULL) break;

View File

@ -406,7 +406,7 @@ static int32_t mndProcessDropSnodeReq(SNodeMsg *pReq) {
mDebug("snode:%d, start to drop", dropReq.dnodeId); mDebug("snode:%d, start to drop", dropReq.dnodeId);
if (dropReq.dnodeId <= 0) { if (dropReq.dnodeId <= 0) {
terrno = TSDB_CODE_SDB_APP_ERROR; terrno = TSDB_CODE_INVALID_MSG;
goto _OVER; goto _OVER;
} }

View File

@ -105,7 +105,6 @@ static int32_t mndRestoreWal(SMnode *pMnode) {
if (walEndSnapshot(pWal) < 0) { if (walEndSnapshot(pWal) < 0) {
goto WAL_RESTORE_OVER; goto WAL_RESTORE_OVER;
} }
} }
code = 0; code = 0;
@ -129,7 +128,9 @@ int32_t mndInitSync(SMnode *pMnode) {
return -1; return -1;
} }
pMgmt->state = TAOS_SYNC_STATE_LEADER; if (pMnode->selfId == 1) {
pMgmt->state = TAOS_SYNC_STATE_LEADER;
}
pMgmt->pSyncNode = NULL; pMgmt->pSyncNode = NULL;
return 0; return 0;
} }

View File

@ -215,7 +215,6 @@ static int32_t mndInitSteps(SMnode *pMnode, bool deploy) {
} else { } else {
if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1; if (mndAllocStep(pMnode, "mnode-sdb-read", mndReadSdb, NULL) != 0) return -1;
} }
// if (mndAllocStep(pMnode, "mnode-timer", mndInitTimer, NULL) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1; if (mndAllocStep(pMnode, "mnode-profile", mndInitProfile, mndCleanupProfile) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1; if (mndAllocStep(pMnode, "mnode-show", mndInitShow, mndCleanupShow) != 0) return -1;
if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1; if (mndAllocStep(pMnode, "mnode-query", mndInitQuery, mndCleanupQuery) != 0) return -1;
@ -272,6 +271,7 @@ static void mndSetOptions(SMnode *pMnode, const SMnodeOpt *pOption) {
pMnode->selfIndex = pOption->selfIndex; pMnode->selfIndex = pOption->selfIndex;
memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); memcpy(&pMnode->replicas, pOption->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA);
pMnode->msgCb = pOption->msgCb; pMnode->msgCb = pOption->msgCb;
pMnode->selfId = pOption->replicas[pOption->selfIndex].id;
} }
SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) { SMnode *mndOpen(const char *path, const SMnodeOpt *pOption) {

View File

@ -201,6 +201,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_ALREADY_EXIST, "Snode already exists"
TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_SNODE_NOT_EXIST, "Snode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_ALREADY_EXIST, "Bnode already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_ALREADY_EXIST, "Bnode already exists")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_NOT_EXIST, "Bnode not there") TAOS_DEFINE_ERROR(TSDB_CODE_MND_BNODE_NOT_EXIST, "Bnode not there")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_TOO_FEW_MNODES, "Too few mnodes")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_MNODE_DEPLOYED, "Mnode deployed in this dnode")
TAOS_DEFINE_ERROR(TSDB_CODE_MND_CANT_DROP_MASTER, "Can't drop mnode which is master")
// mnode-acct // mnode-acct
TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, "Account already exists") TAOS_DEFINE_ERROR(TSDB_CODE_MND_ACCT_ALREADY_EXIST, "Account already exists")