From 722ffd6814c6fd2f1cc5b6a1b0ac7406cb88cada Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Wed, 8 Dec 2021 19:34:54 +0800 Subject: [PATCH] TD-10431 mnode manage --- include/common/taosmsg.h | 12 +- source/dnode/mgmt/impl/src/dndMnode.c | 13 +- source/dnode/mnode/impl/inc/mndMnode.h | 1 + source/dnode/mnode/impl/src/mndDnode.c | 5 +- source/dnode/mnode/impl/src/mndMnode.c | 304 +++++++++++++++++++++++- source/dnode/mnode/impl/src/mndVgroup.c | 5 +- 6 files changed, 325 insertions(+), 15 deletions(-) diff --git a/include/common/taosmsg.h b/include/common/taosmsg.h index 08d8eacce9..5c1787472a 100644 --- a/include/common/taosmsg.h +++ b/include/common/taosmsg.h @@ -840,12 +840,20 @@ typedef struct { char config[128]; } SCfgDnodeMsg; +typedef struct { + int32_t dnodeId; +} SCreateMnodeMsg, SDropMnodeMsg; + typedef struct { int32_t dnodeId; + int8_t align[3]; int8_t replica; - int8_t reserved[3]; SReplica replicas[TSDB_MAX_REPLICA]; -} SCreateMnodeMsg, SAlterMnodeMsg, SDropMnodeMsg; +} SCreateMnodeInMsg, SAlterMnodeInMsg; + +typedef struct { + int32_t dnodeId; +} SDropMnodeInMsg; typedef struct { int32_t dnodeId; diff --git a/source/dnode/mgmt/impl/src/dndMnode.c b/source/dnode/mgmt/impl/src/dndMnode.c index 374a2f2b2c..d3336fb079 100644 --- a/source/dnode/mgmt/impl/src/dndMnode.c +++ b/source/dnode/mgmt/impl/src/dndMnode.c @@ -366,7 +366,7 @@ static void dndBuildMnodeOpenOption(SDnode *pDnode, SMnodeOpt *pOption) { memcpy(&pOption->replicas, pMgmt->replicas, sizeof(SReplica) * TSDB_MAX_REPLICA); } -static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeMsg *pMsg) { +static int32_t dndBuildMnodeOptionFromMsg(SDnode *pDnode, SMnodeOpt *pOption, SCreateMnodeInMsg *pMsg) { dndInitMnodeOption(pDnode, pOption); pOption->dnodeId = dndGetDnodeId(pDnode); pOption->clusterId = dndGetClusterId(pDnode); @@ -488,8 +488,8 @@ static int32_t dndDropMnode(SDnode *pDnode) { return 0; } -static SCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { - SCreateMnodeMsg *pMsg = pRpcMsg->pCont; +static SCreateMnodeInMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { + SCreateMnodeInMsg *pMsg = pRpcMsg->pCont; pMsg->dnodeId = htonl(pMsg->dnodeId); for (int32_t i = 0; i < pMsg->replica; ++i) { pMsg->replicas[i].id = htonl(pMsg->replicas[i].id); @@ -500,7 +500,7 @@ static SCreateMnodeMsg *dndParseCreateMnodeMsg(SRpcMsg *pRpcMsg) { } static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SCreateMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); + SCreateMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -516,7 +516,7 @@ static int32_t dndProcessCreateMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SAlterMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); + SAlterMnodeInMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; @@ -531,7 +531,8 @@ static int32_t dndProcessAlterMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { } static int32_t dndProcessDropMnodeReq(SDnode *pDnode, SRpcMsg *pRpcMsg) { - SDropMnodeMsg *pMsg = dndParseCreateMnodeMsg(pRpcMsg->pCont); + SDropMnodeInMsg *pMsg = pRpcMsg->pCont; + pMsg->dnodeId = htonl(pMsg->dnodeId); if (pMsg->dnodeId != dndGetDnodeId(pDnode)) { terrno = TSDB_CODE_DND_MNODE_ID_INVALID; diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index 64bbb13b60..906d11aec2 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -26,6 +26,7 @@ int32_t mndInitMnode(SMnode *pMnode); void mndCleanupMnode(SMnode *pMnode); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); +char *mndGetRoleStr(int32_t role); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 358b07c967..c34d12c6e2 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -523,7 +523,7 @@ static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { if (pDrop->dnodeId <= 0) { terrno = TSDB_CODE_SDB_APP_ERROR; - mError("dnode:%d, failed to create since %s", pDrop->dnodeId, terrstr()); + mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); return -1; } @@ -537,7 +537,7 @@ static int32_t mndProcessDropDnodeMsg(SMnodeMsg *pMsg) { int32_t code = mndDropDnode(pMnode, pMsg, pDnode); if (code != 0) { - mError("dnode:%d, failed to create since %s", pDrop->dnodeId, terrstr()); + mError("dnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); return -1; } @@ -762,6 +762,7 @@ static int32_t mndRetrieveDnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); pShow->numOfReads += numOfRows; + return numOfRows; } diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index be66db3b01..d295e232a3 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -14,6 +14,9 @@ */ #define _DEFAULT_SOURCE +#include "mndMnode.h" +#include "mndDnode.h" +#include "mndShow.h" #include "mndTrans.h" #define SDB_MNODE_VER 1 @@ -26,6 +29,11 @@ static int32_t mndMnodeActionDelete(SSdb *pSdb, SMnodeObj *pMnodeObj); static int32_t mndMnodeActionUpdate(SSdb *pSdb, SMnodeObj *pOldMnode, SMnodeObj *pNewMnode); static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg); static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg); +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg); +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg); +static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); +static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); +static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter); int32_t mndInitMnode(SMnode *pMnode) { SSdbTable table = {.sdbType = SDB_MNODE, @@ -39,12 +47,41 @@ int32_t mndInitMnode(SMnode *pMnode) { mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE, mndProcessCreateMnodeMsg); mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE, mndProcessDropMnodeMsg); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_CREATE_MNODE_IN_RSP, mndProcessCreateMnodeRsp); + mndSetMsgHandle(pMnode, TSDB_MSG_TYPE_DROP_MNODE_IN_RSP, mndProcessDropMnodeRsp); + + mndAddShowMetaHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndGetMnodeMeta); + mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes); + mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode); return sdbSetTable(pMnode->pSdb, table); } void mndCleanupMnode(SMnode *pMnode) {} +static SMnodeObj *mndAcquireMnode(SMnode *pMnode, int32_t mnodeId) { + SSdb *pSdb = pMnode->pSdb; + return sdbAcquire(pSdb, SDB_MNODE, &mnodeId); +} + +static void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pMnodeObj) { + SSdb *pSdb = pMnode->pSdb; + sdbRelease(pSdb, pMnodeObj); +} + +char *mndGetRoleStr(int32_t showType) { + switch (showType) { + case TAOS_SYNC_STATE_FOLLOWER: + return "unsynced"; + case TAOS_SYNC_STATE_CANDIDATE: + return "slave"; + case TAOS_SYNC_STATE_LEADER: + return "master"; + default: + return "undefined"; + } +} + static int32_t mndCreateDefaultMnode(SMnode *pMnode) { SMnodeObj mnodeObj = {0}; mnodeObj.id = 1; @@ -164,6 +201,269 @@ void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet) { } } -static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { return 0; } +static int32_t mndCreateMnode(SMnode *pMnode, SMnodeMsg *pMsg, SCreateMnodeMsg *pCreate) { + SMnodeObj mnodeObj = {0}; + mnodeObj.id = 1; // todo + mnodeObj.createdTime = taosGetTimestampMs(); + mnodeObj.updateTime = mnodeObj.createdTime; -static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { return 0; } + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + if (pTrans == NULL) { + mError("dnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); + return -1; + } + mDebug("trans:%d, used to create dnode:%d", pTrans->id, pCreate->dnodeId); + + SSdbRaw *pRedoRaw = mndMnodeActionEncode(&mnodeObj); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING); + + SSdbRaw *pUndoRaw = mndMnodeActionEncode(&mnodeObj); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { + mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED); + + SSdbRaw *pCommitRaw = mndMnodeActionEncode(&mnodeObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY); + + if (mndTransPrepare(pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndProcessCreateMnodeMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SCreateMnodeMsg *pCreate = pMsg->rpcMsg.pCont; + + pCreate->dnodeId = htonl(pCreate->dnodeId); + + mDebug("mnode:%d, start to create", pCreate->dnodeId); + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pCreate->dnodeId); + if (pDnode == NULL) { + mError("mnode:%d, dnode not exist", pDnode->id); + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + return -1; + } + mndReleaseDnode(pMnode, pDnode); + + SMnodeObj *pMnodeObj = mndAcquireMnode(pMnode, pCreate->dnodeId); + if (pMnodeObj != NULL) { + mError("mnode:%d, mnode already exist", pMnodeObj->id); + terrno = TSDB_CODE_MND_MNODE_ALREADY_EXIST; + return -1; + } + + int32_t code = mndCreateMnode(pMnode, pMsg, pCreate); + + if (code != 0) { + mError("mnode:%d, failed to create since %s", pCreate->dnodeId, terrstr()); + return -1; + } + + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndDropMnode(SMnode *pMnode, SMnodeMsg *pMsg, SMnodeObj *pMnodeObj) { + STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, pMsg->rpcMsg.handle); + if (pTrans == NULL) { + mError("mnode:%d, failed to drop since %s", pMnodeObj->id, terrstr()); + return -1; + } + mDebug("trans:%d, used to drop user:%d", pTrans->id, pMnodeObj->id); + + SSdbRaw *pRedoRaw = mndMnodeActionEncode(pMnodeObj); + if (pRedoRaw == NULL || mndTransAppendRedolog(pTrans, pRedoRaw) != 0) { + mError("trans:%d, failed to append redo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING); + + SSdbRaw *pUndoRaw = mndMnodeActionEncode(pMnodeObj); + if (pUndoRaw == NULL || mndTransAppendUndolog(pTrans, pUndoRaw) != 0) { + mError("trans:%d, failed to append undo log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY); + + SSdbRaw *pCommitRaw = mndMnodeActionEncode(pMnodeObj); + if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) { + mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED); + + if (mndTransPrepare(pTrans) != 0) { + mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr()); + mndTransDrop(pTrans); + return -1; + } + + mndTransDrop(pTrans); + return 0; +} + +static int32_t mndProcessDropMnodeMsg(SMnodeMsg *pMsg) { + SMnode *pMnode = pMsg->pMnode; + SDropMnodeMsg *pDrop = pMsg->rpcMsg.pCont; + pDrop->dnodeId = htonl(pDrop->dnodeId); + + mDebug("mnode:%d, start to drop", pDrop->dnodeId); + + if (pDrop->dnodeId <= 0) { + terrno = TSDB_CODE_SDB_APP_ERROR; + mError("mnode:%d, failed to drop since %s", pDrop->dnodeId, terrstr()); + return -1; + } + + SMnodeObj *pMnodeObj = mndAcquireMnode(pMnode, pDrop->dnodeId); + if (pMnodeObj == NULL) { + mError("mnode:%d, not exist", pDrop->dnodeId); + terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; + return -1; + } + + int32_t code = mndDropMnode(pMnode, pMsg, pMnodeObj); + + if (code != 0) { + mError("mnode:%d, failed to drop since %s", pMnode->dnodeId, terrstr()); + return -1; + } + + sdbRelease(pMnode->pSdb, pMnode); + return TSDB_CODE_MND_ACTION_IN_PROGRESS; +} + +static int32_t mndProcessCreateMnodeRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndProcessDropMnodeRsp(SMnodeMsg *pMsg) { return 0; } + +static int32_t mndGetMnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + + int32_t cols = 0; + SSchema *pSchema = pMeta->schema; + + pShow->bytes[cols] = 2; + pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT; + strcpy(pSchema[cols].name, "id"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = TSDB_EP_LEN + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "end point"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 12 + VARSTR_HEADER_SIZE; + pSchema[cols].type = TSDB_DATA_TYPE_BINARY; + strcpy(pSchema[cols].name, "role"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "role time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pShow->bytes[cols] = 8; + pSchema[cols].type = TSDB_DATA_TYPE_TIMESTAMP; + strcpy(pSchema[cols].name, "create time"); + pSchema[cols].bytes = htons(pShow->bytes[cols]); + cols++; + + pMeta->numOfColumns = htons(cols); + pShow->numOfColumns = cols; + + pShow->offset[0] = 0; + for (int32_t i = 1; i < cols; ++i) { + pShow->offset[i] = pShow->offset[i - 1] + pShow->bytes[i - 1]; + } + + pShow->numOfRows = sdbGetSize(pSdb, SDB_MNODE); + pShow->rowSize = pShow->offset[cols - 1] + pShow->bytes[cols - 1]; + strcpy(pMeta->tableFname, mndShowStr(pShow->type)); + + return 0; +} + +static int32_t mndRetrieveMnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows) { + SMnode *pMnode = pMsg->pMnode; + SSdb *pSdb = pMnode->pSdb; + int32_t numOfRows = 0; + int32_t cols = 0; + SMnodeObj *pMnodeObj = NULL; + char *pWrite; + + while (numOfRows < rows) { + pShow->pIter = sdbFetch(pSdb, SDB_MNODE, pShow->pIter, (void **)&pMnodeObj); + if (pShow->pIter == NULL) break; + + cols = 0; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int16_t *)pWrite = pMnodeObj->id; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + + SDnodeObj *pDnode = mndAcquireDnode(pMnode, pMnodeObj->id); + if (pDnode != NULL) { + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, pDnode->ep, pShow->bytes[cols]); + } else { + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, "invalid ep", pShow->bytes[cols]); + } + mndReleaseDnode(pMnode, pDnode); + + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + char *roles = mndGetRoleStr(pMnodeObj->role); + STR_WITH_MAXSIZE_TO_VARSTR(pWrite, roles, pShow->bytes[cols]); + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pMnodeObj->roleTime; + cols++; + + pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; + *(int64_t *)pWrite = pMnodeObj->createdTime; + cols++; + + numOfRows++; + sdbRelease(pSdb, pMnodeObj); + } + + mnodeVacuumResult(data, pShow->numOfColumns, numOfRows, rows, pShow); + pShow->numOfReads += numOfRows; + + return numOfRows; +} + +static void mndCancelGetNextMnode(SMnode *pMnode, void *pIter) { + SSdb *pSdb = pMnode->pSdb; + sdbCancelFetch(pSdb, pIter); +} diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index eb820fa3c2..68ee373d8c 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -16,12 +16,11 @@ #define _DEFAULT_SOURCE #include "mndVgroup.h" #include "mndDnode.h" +#include "mndMnode.h" #include "mndShow.h" #include "mndTrans.h" #include "ttime.h" -static char *syncRole[] = {"unsynced", "slave", "master"}; - static int32_t mndGetVnodeMeta(SMnodeMsg *pMsg, SShowObj *pShow, STableMetaMsg *pMeta); static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, int32_t rows); static void mndCancelGetNextVnode(SMnode *pMnode, void *pIter); @@ -108,7 +107,7 @@ static int32_t mndRetrieveVnodes(SMnodeMsg *pMsg, SShowObj *pShow, char *data, i cols++; pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows; - STR_TO_VARSTR(pWrite, syncRole[pVgid->role]); + STR_TO_VARSTR(pWrite, mndGetRoleStr(pVgid->role)); cols++; numOfRows++; }