From 843d606058bd15a99038bbaa2bd36c6f192e1f67 Mon Sep 17 00:00:00 2001 From: cadem Date: Tue, 9 May 2023 17:15:37 +0800 Subject: [PATCH] feat/restore dnode --- include/common/tmsg.h | 17 ++ include/libs/tfs/tfs.h | 10 ++ include/util/taoserror.h | 1 + source/common/src/tmsg.c | 31 ++++ source/dnode/mgmt/mgmt_vnode/src/vmHandle.c | 17 +- source/dnode/mnode/impl/inc/mndMnode.h | 4 + source/dnode/mnode/impl/inc/mndQnode.h | 3 + source/dnode/mnode/impl/inc/mndVgroup.h | 3 + source/dnode/mnode/impl/src/mndDnode.c | 163 ++++++++++++++++++++ source/dnode/mnode/impl/src/mndMnode.c | 108 ++++++++++++- source/dnode/mnode/impl/src/mndQnode.c | 8 +- source/dnode/mnode/impl/src/mndVgroup.c | 101 ++++++++++++ source/libs/tfs/src/tfs.c | 8 + source/util/src/terror.c | 1 + 14 files changed, 471 insertions(+), 4 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index f377ad0d63..5ba996aceb 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1625,6 +1625,23 @@ typedef struct { int32_t tSerializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq); int32_t tDeserializeSDropDnodeReq(void* buf, int32_t bufLen, SDropDnodeReq* pReq); +enum { + RESTORE_TYPE__ALL = 1, + RESTORE_TYPE__MNODE, + RESTORE_TYPE__VNODE, + RESTORE_TYPE__QNODE, +}; + +typedef struct { + int32_t dnodeId; + char fqdn[TSDB_FQDN_LEN]; + int32_t port; + int8_t restoreType; +} SRestoreDnodeReq; + +int32_t tSerializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq); +int32_t tDeserializeSRestoreDnodeReq(void* buf, int32_t bufLen, SRestoreDnodeReq* pReq); + typedef struct { int32_t dnodeId; char config[TSDB_DNODE_CONFIG_LEN]; diff --git a/include/libs/tfs/tfs.h b/include/libs/tfs/tfs.h index cbf1d60e35..622cd615b8 100644 --- a/include/libs/tfs/tfs.h +++ b/include/libs/tfs/tfs.h @@ -133,6 +133,16 @@ int32_t tfsMkdirAt(STfs *pTfs, const char *rname, SDiskID diskId); */ int32_t tfsMkdirRecurAt(STfs *pTfs, const char *rname, SDiskID diskId); +/** + * @brief check directories exist in tfs. + * + * @param pTfs The fs object. + * @param rname The rel name of directory. + * @param diskId The disk ID. + * @return true for exist, false for not exist. + */ +bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId); + /** * @brief Remove directory at all levels in tfs. * diff --git a/include/util/taoserror.h b/include/util/taoserror.h index a709ccf10c..309b915845 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -442,6 +442,7 @@ int32_t* taosGetErrno(); #define TSDB_CODE_VND_QUERY_BUSY TAOS_DEF_ERROR_CODE(0, 0x0531) #define TSDB_CODE_VND_NOT_CATCH_UP TAOS_DEF_ERROR_CODE(0, 0x0532) // internal #define TSDB_CODE_VND_ALREADY_IS_VOTER TAOS_DEF_ERROR_CODE(0, 0x0533) // internal +#define TSDB_CODE_VND_DIR_ALREADY_EXIST TAOS_DEF_ERROR_CODE(0, 0x0534) // tsdb #define TSDB_CODE_TDB_INVALID_TABLE_ID TAOS_DEF_ERROR_CODE(0, 0x0600) diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 324e6ff37b..18b6dc174f 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -1720,6 +1720,37 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq return 0; } +int32_t tSerializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *pReq) { + SEncoder encoder = {0}; + tEncoderInit(&encoder, buf, bufLen); + + if (tStartEncode(&encoder) < 0) return -1; + if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1; + if (tEncodeCStr(&encoder, pReq->fqdn) < 0) return -1; + if (tEncodeI32(&encoder, pReq->port) < 0) return -1; + if (tEncodeI8(&encoder, pReq->restoreType) < 0) return -1; + tEndEncode(&encoder); + + int32_t tlen = encoder.pos; + tEncoderClear(&encoder); + return tlen; +} + +int32_t tDeserializeSRestoreDnodeReq(void *buf, int32_t bufLen, SRestoreDnodeReq *pReq) { + SDecoder decoder = {0}; + tDecoderInit(&decoder, buf, bufLen); + + if (tStartDecode(&decoder) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1; + if (tDecodeCStrTo(&decoder, pReq->fqdn) < 0) return -1; + if (tDecodeI32(&decoder, &pReq->port) < 0) return -1; + if (tDecodeI8(&decoder, &pReq->restoreType) < 0) return -1; + tEndDecode(&decoder); + + tDecoderClear(&decoder); + return 0; +} + int32_t tSerializeSMCfgDnodeReq(void *buf, int32_t bufLen, SMCfgDnodeReq *pReq) { SEncoder encoder = {0}; tEncoderInit(&encoder, buf, bufLen); diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c index 9dbc12cf62..179fe17cc0 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmHandle.c @@ -245,6 +245,22 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { vmGenerateVnodeCfg(&req, &vnodeCfg); + snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); + + if (pMgmt->pTfs) { + if (tfsDirExistAt(pMgmt->pTfs, path, (SDiskID){0})) { + terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST; + dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr()); + return -1; + } + } else { + if (taosDirExist(path)) { + terrno = TSDB_CODE_VND_DIR_ALREADY_EXIST; + dError("vgId:%d, failed to restore vnode since %s", req.vgId, terrstr()); + return -1; + } + } + if (vmTsmaAdjustDays(&vnodeCfg, &req) < 0) { dError("vgId:%d, failed to adjust tsma days since %s", req.vgId, terrstr()); code = terrno; @@ -263,7 +279,6 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { return 0; } - snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, vnodeCfg.vgId); if (vnodeCreate(path, &vnodeCfg, pMgmt->pTfs) < 0) { tFreeSCreateVnodeReq(&req); dError("vgId:%d, failed to create vnode since %s", req.vgId, terrstr()); diff --git a/source/dnode/mnode/impl/inc/mndMnode.h b/source/dnode/mnode/impl/inc/mndMnode.h index 320d3651f0..44eddb0617 100644 --- a/source/dnode/mnode/impl/inc/mndMnode.h +++ b/source/dnode/mnode/impl/inc/mndMnode.h @@ -29,6 +29,10 @@ void mndReleaseMnode(SMnode *pMnode, SMnodeObj *pObj); bool mndIsMnode(SMnode *pMnode, int32_t dnodeId); void mndGetMnodeEpSet(SMnode *pMnode, SEpSet *pEpSet); int32_t mndSetDropMnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj, bool force); +int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj); +int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj); +int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj); +int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndQnode.h b/source/dnode/mnode/impl/inc/mndQnode.h index 36eebd3157..d4f364d821 100644 --- a/source/dnode/mnode/impl/inc/mndQnode.h +++ b/source/dnode/mnode/impl/inc/mndQnode.h @@ -30,6 +30,9 @@ SQnodeObj *mndAcquireQnode(SMnode *pMnode, int32_t qnodeId); void mndReleaseQnode(SMnode *pMnode, SQnodeObj *pObj); int32_t mndCreateQnodeList(SMnode *pMnode, SArray **pList, int32_t limit); int32_t mndSetDropQnodeInfoToTrans(SMnode *pMnode, STrans *pTrans, SQnodeObj *pObj, bool force); +bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId); +int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj); +int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj); #ifdef __cplusplus } diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 94c4eae83f..2ece0da5eb 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -49,6 +49,9 @@ int32_t mndBuildCompactVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, void *mndBuildCreateVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); void *mndBuildDropVnodeReq(SMnode *, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen); bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid); +bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId); +int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, + SDnodeObj *pDnode); int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgroup); diff --git a/source/dnode/mnode/impl/src/mndDnode.c b/source/dnode/mnode/impl/src/mndDnode.c index 1f58ae97a3..3988b9d676 100644 --- a/source/dnode/mnode/impl/src/mndDnode.c +++ b/source/dnode/mnode/impl/src/mndDnode.c @@ -294,6 +294,11 @@ int32_t mndGetDnodeSize(SMnode *pMnode) { return sdbGetSize(pSdb, SDB_DNODE); } +int32_t mndGetDbSize(SMnode *pMnode) { + SSdb *pSdb = pMnode->pSdb; + return sdbGetSize(pSdb, SDB_DB); +} + bool mndIsDnodeOnline(SDnodeObj *pDnode, int64_t curMs) { int64_t interval = TABS(pDnode->lastAccessTime - curMs); if (interval > 5000 * (int64_t)tsStatusInterval) { @@ -589,6 +594,107 @@ _OVER: return code; } +static int32_t mndRestoreDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, int8_t restoreType) { + int32_t code = -1; + SSdbRaw *pRaw = NULL; + STrans *pTrans = NULL; + + pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_GLOBAL, pReq, "restore-dnode"); + if (pTrans == NULL) goto _OVER; + + mndTransSetSerial(pTrans); + + mInfo("trans:%d, used to restore dnode:%s", pTrans->id, pDnode->ep); + + if (mndTrancCheckConflict(pMnode, pTrans) != 0) goto _OVER; + + if(restoreType == RESTORE_TYPE__ALL || restoreType == RESTORE_TYPE__MNODE) + { + SMnodeObj *mnodeObj = mndAcquireMnode(pMnode, pDnode->id); + if(mnodeObj == NULL){ + mError("trans:%d, no mnode exist on dnode:%s", pTrans->id, pDnode->ep); + } + else + { + SMnodeObj newMnodeObj = {0}; + newMnodeObj.id = pDnode->id; + newMnodeObj.createdTime = taosGetTimestampMs(); + newMnodeObj.updateTime = newMnodeObj.createdTime; + newMnodeObj.role = TAOS_SYNC_ROLE_LEARNER; + newMnodeObj.lastIndex = pMnode->applied; + if (mndSetRestoreCreateMnodeRedoActions(pMnode, pTrans, pDnode, &newMnodeObj) != 0) goto _OVER; + if (mndSetRestoreCreateMnodeRedoLogs(pMnode, pTrans, &newMnodeObj) != 0) goto _OVER; + + SMnodeObj mnodeLeaderObj = {0}; + mnodeLeaderObj.id = pDnode->id; + mnodeLeaderObj.createdTime = taosGetTimestampMs(); + mnodeLeaderObj.updateTime = mnodeLeaderObj.createdTime; + mnodeLeaderObj.role = TAOS_SYNC_ROLE_VOTER; + mnodeLeaderObj.lastIndex = pMnode->applied + 1; + if (mndSetRestoreAlterMnodeTypeRedoActions(pMnode, pTrans, pDnode, &mnodeLeaderObj) != 0) goto _OVER; + if (mndSetRestoreCreateMnodeRedoLogs(pMnode, pTrans, &mnodeLeaderObj) != 0) goto _OVER; + + if (mndSetCreateMnodeCommitLogs(pMnode, pTrans, &mnodeLeaderObj) != 0) goto _OVER; + + mndReleaseMnode(pMnode, mnodeObj); + } + } + + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + + if(restoreType == RESTORE_TYPE__ALL || restoreType == RESTORE_TYPE__VNODE){ + while (1) { + SVgObj *pVgroup = NULL; + pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); + if (pIter == NULL) break; + + if (mndVgroupInDnode(pVgroup, pDnode->id)) { + SDbObj *db = mndAcquireDb(pMnode, pVgroup->dbName); + if(db == NULL){ + sdbCancelFetch(pSdb, pIter); + sdbRelease(pSdb, pVgroup); + goto _OVER; + } + if (mndBuildRestoreAlterVgroupAction(pMnode, pTrans, db, pVgroup, pDnode) != 0) { + sdbCancelFetch(pSdb, pIter); + mndReleaseDb(pMnode, db); + sdbRelease(pSdb, pVgroup); + goto _OVER; + } + mndReleaseDb(pMnode, db); + } + + sdbRelease(pSdb, pVgroup); + } + } + + if(restoreType == RESTORE_TYPE__ALL || restoreType == RESTORE_TYPE__QNODE){ + pIter = NULL; + while (1) { + SQnodeObj *pQnode = NULL; + pIter = sdbFetch(pSdb, SDB_QNODE, pIter, (void **)&pQnode); + if (pIter == NULL) break; + + if (mndQnodeInDnode(pQnode, pDnode->id)) { + if (mndSetCreateQnodeCommitLogs(pTrans, pQnode) != 0) goto _OVER; + if (mndSetCreateQnodeRedoActions(pTrans, pDnode, pQnode) != 0) goto _OVER; + } + + sdbRelease(pSdb, pQnode); + } + } + + if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER; + code = 0; + +_OVER: + + mndTransDrop(pTrans); + sdbFreeRaw(pRaw); + return code; +} + static int32_t mndProcessDnodeListReq(SRpcMsg *pReq) { SMnode *pMnode = pReq->info.node; SSdb *pSdb = pMnode->pSdb; @@ -745,6 +851,63 @@ _OVER: return code; } +extern int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq); + +int32_t mndProcessRestoreDnodeReq(SRpcMsg *pReq){ + return mndProcessRestoreDnodeReqImpl(pReq); +} + +#ifndef TD_ENTERPRISE +int32_t mndProcessRestoreDnodeReqImpl(SRpcMsg *pReq){ + return 0; +} +#endif + +/* +static int32_t mndProcessDropDnodeReq(SRpcMsg *pReq) { + SMnode *pMnode = pReq->info.node; + int32_t code = -1; + SDnodeObj *pDnode = NULL; + SMnodeObj *pMObj = NULL; + SQnodeObj *pQObj = NULL; + SSnodeObj *pSObj = NULL; + SRestoreDnodeReq restoreReq = {0}; + + if (tDeserializeSRestoreDnodeReq(pReq->pCont, pReq->contLen, &restoreReq) != 0) { + terrno = TSDB_CODE_INVALID_MSG; + goto _OVER; + } + + mInfo("dnode:%d, start to restore, ep:%s:%d", restoreReq.dnodeId, restoreReq.fqdn, restoreReq.port); + if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_MNODE) != 0) { + goto _OVER; + } + + pDnode = mndAcquireDnode(pMnode, restoreReq.dnodeId); + if (pDnode == NULL) { + int32_t err = terrno; + char ep[TSDB_EP_LEN + 1] = {0}; + snprintf(ep, sizeof(ep), restoreReq.fqdn, restoreReq.port); + pDnode = mndAcquireDnodeByEp(pMnode, ep); + if (pDnode == NULL) { + terrno = err; + goto _OVER; + } + } + + code = mndRestoreDnode(pMnode, pReq, pDnode, restoreReq.restoreType); + if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS; + +_OVER: + if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) { + mError("dnode:%d, failed to restore since %s", restoreReq.dnodeId, terrstr()); + } + + mndReleaseDnode(pMnode, pDnode); + return code; +} +*/ + static int32_t mndDropDnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMnodeObj *pMObj, SQnodeObj *pQObj, SSnodeObj *pSObj, int32_t numOfVnodes, bool force) { int32_t code = -1; diff --git a/source/dnode/mnode/impl/src/mndMnode.c b/source/dnode/mnode/impl/src/mndMnode.c index 5e3476859a..19c3d59167 100644 --- a/source/dnode/mnode/impl/src/mndMnode.c +++ b/source/dnode/mnode/impl/src/mndMnode.c @@ -275,6 +275,14 @@ static int32_t mndSetCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeO return 0; } +int32_t mndSetRestoreCreateMnodeRedoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { + SSdbRaw *pRedoRaw = mndMnodeActionEncode(pObj); + if (pRedoRaw == NULL) return -1; + if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1; + if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1; + return 0; +} + static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { SSdbRaw *pUndoRaw = mndMnodeActionEncode(pObj); if (pUndoRaw == NULL) return -1; @@ -283,7 +291,7 @@ static int32_t mndSetCreateMnodeUndoLogs(SMnode *pMnode, STrans *pTrans, SMnodeO return 0; } -static int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { +int32_t mndSetCreateMnodeCommitLogs(SMnode *pMnode, STrans *pTrans, SMnodeObj *pObj) { SSdbRaw *pCommitRaw = mndMnodeActionEncode(pObj); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -421,6 +429,55 @@ static int32_t mndSetCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDno return 0; } +int32_t mndSetRestoreCreateMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SDCreateMnodeReq createReq = {0}; + SEpSet createEpset = {0}; + + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + + if(pMObj->id == pDnode->id) { + sdbRelease(pSdb, pMObj); + continue; + } + + if(pMObj->role == TAOS_SYNC_ROLE_VOTER){ + createReq.replicas[createReq.replica].id = pMObj->id; + createReq.replicas[createReq.replica].port = pMObj->pDnode->port; + memcpy(createReq.replicas[createReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + createReq.replica++; + } + else{ + createReq.learnerReplicas[createReq.learnerReplica].id = pMObj->id; + createReq.learnerReplicas[createReq.learnerReplica].port = pMObj->pDnode->port; + memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + createReq.learnerReplica++; + } + + sdbRelease(pSdb, pMObj); + } + + createReq.learnerReplicas[createReq.learnerReplica].id = pDnode->id; + createReq.learnerReplicas[createReq.learnerReplica].port = pDnode->port; + memcpy(createReq.learnerReplicas[createReq.learnerReplica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + createReq.learnerReplica++; + + createReq.lastIndex = pObj->lastIndex; + + createEpset.inUse = 0; + createEpset.numOfEps = 1; + createEpset.eps[0].port = pDnode->port; + memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + + if (mndBuildCreateMnodeRedoAction(pTrans, &createReq, &createEpset) != 0) return -1; + + return 0; +} + static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; @@ -465,6 +522,55 @@ static int32_t mndSetAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, S return 0; } +int32_t mndSetRestoreAlterMnodeTypeRedoActions(SMnode *pMnode, STrans *pTrans, SDnodeObj *pDnode, SMnodeObj *pObj) { + SSdb *pSdb = pMnode->pSdb; + void *pIter = NULL; + SDAlterMnodeTypeReq alterReq = {0}; + SEpSet createEpset = {0}; + + while (1) { + SMnodeObj *pMObj = NULL; + pIter = sdbFetch(pSdb, SDB_MNODE, pIter, (void **)&pMObj); + if (pIter == NULL) break; + + if(pMObj->id == pDnode->id) { + sdbRelease(pSdb, pMObj); + continue; + } + + if(pMObj->role == TAOS_SYNC_ROLE_VOTER){ + alterReq.replicas[alterReq.replica].id = pMObj->id; + alterReq.replicas[alterReq.replica].port = pMObj->pDnode->port; + memcpy(alterReq.replicas[alterReq.replica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + alterReq.replica++; + } + else{ + alterReq.learnerReplicas[alterReq.learnerReplica].id = pMObj->id; + alterReq.learnerReplicas[alterReq.learnerReplica].port = pMObj->pDnode->port; + memcpy(alterReq.learnerReplicas[alterReq.learnerReplica].fqdn, pMObj->pDnode->fqdn, TSDB_FQDN_LEN); + alterReq.learnerReplica++; + } + + sdbRelease(pSdb, pMObj); + } + + alterReq.replicas[alterReq.replica].id = pDnode->id; + alterReq.replicas[alterReq.replica].port = pDnode->port; + memcpy(alterReq.replicas[alterReq.replica].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + alterReq.replica++; + + alterReq.lastIndex = pObj->lastIndex; + + createEpset.inUse = 0; + createEpset.numOfEps = 1; + createEpset.eps[0].port = pDnode->port; + memcpy(createEpset.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN); + + if (mndBuildAlterMnodeTypeRedoAction(pTrans, &alterReq, &createEpset) != 0) return -1; + + return 0; +} + static int32_t mndCreateMnode(SMnode *pMnode, SRpcMsg *pReq, SDnodeObj *pDnode, SMCreateMnodeReq *pCreate) { int32_t code = -1; diff --git a/source/dnode/mnode/impl/src/mndQnode.c b/source/dnode/mnode/impl/src/mndQnode.c index a8b2d5f4bb..b5c9ce1f65 100644 --- a/source/dnode/mnode/impl/src/mndQnode.c +++ b/source/dnode/mnode/impl/src/mndQnode.c @@ -180,7 +180,7 @@ static int32_t mndSetCreateQnodeUndoLogs(STrans *pTrans, SQnodeObj *pObj) { return 0; } -static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) { +int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) { SSdbRaw *pCommitRaw = mndQnodeActionEncode(pObj); if (pCommitRaw == NULL) return -1; if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1; @@ -188,7 +188,11 @@ static int32_t mndSetCreateQnodeCommitLogs(STrans *pTrans, SQnodeObj *pObj) { return 0; } -static int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) { +bool mndQnodeInDnode(SQnodeObj *pQnode, int32_t dnodeId) { + return pQnode->pDnode->id == dnodeId; +} + +int32_t mndSetCreateQnodeRedoActions(STrans *pTrans, SDnodeObj *pDnode, SQnodeObj *pObj) { SDCreateQnodeReq createReq = {0}; createReq.dnodeId = pDnode->id; diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 72a7ed77a4..2afae04551 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -1155,6 +1155,28 @@ int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVg return 0; } +int32_t mndRestoreAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SDnodeObj *pDnode) { + STransAction action = {0}; + + action.epSet = mndGetDnodeEpset(pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_CREATE_VNODE; + action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + int32_t mndAddAlterVnodeConfirmAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { STransAction action = {0}; action.epSet = mndGetVgroupEpset(pMnode, pVgroup); @@ -1274,6 +1296,29 @@ int32_t mndAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, return 0; } +int32_t mndRestoreAddAlterVnodeTypeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, + SDnodeObj *pDnode) { + STransAction action = {0}; + action.epSet = mndGetDnodeEpset(pDnode); + + int32_t contLen = 0; + void *pReq = mndBuildAlterVnodeReplicaReq(pMnode, pDb, pVgroup, pDnode->id, &contLen); + if (pReq == NULL) return -1; + + action.pCont = pReq; + action.contLen = contLen; + action.msgType = TDMT_DND_ALTER_VNODE_TYPE; + action.acceptableCode = TSDB_CODE_VND_ALREADY_IS_VOTER; + action.retryCode = TSDB_CODE_VND_NOT_CATCH_UP; + + if (mndTransAppendRedoAction(pTrans, &action) != 0) { + taosMemoryFree(pReq); + return -1; + } + + return 0; +} + static int32_t mndAddDisableVnodeWriteAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, int32_t dnodeId) { SDnodeObj *pDnode = mndAcquireDnode(pMnode, dnodeId); @@ -2113,6 +2158,55 @@ int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb return 0; } +int32_t mndBuildRestoreAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *db, SVgObj *pVgroup, + SDnodeObj *pDnode) { + SVgObj newVgroup = {0}; + memcpy(&newVgroup, pVgroup, sizeof(SVgObj)); + + mInfo("db:%s, vgId:%d, restore vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId, + pVgroup->vnodeGid[0].dnodeId); + + if(newVgroup.replica == 1){ + int selected = 0; + for(int i = 0; i < newVgroup.replica; i++){ + newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; + if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){ + selected = i; + } + } + if (mndAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, &newVgroup.vnodeGid[selected]) != 0) return -1; + } + else if(newVgroup.replica == 3){ + for(int i = 0; i < newVgroup.replica; i++){ + if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){ + newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_LEARNER; + } + else{ + newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; + } + } + if (mndRestoreAddCreateVnodeAction(pMnode, pTrans, db, &newVgroup, pDnode) != 0) return -1; + + for(int i = 0; i < newVgroup.replica; i++){ + newVgroup.vnodeGid[i].nodeRole = TAOS_SYNC_ROLE_VOTER; + if(newVgroup.vnodeGid[i].dnodeId == pDnode->id){ + } + } + if (mndRestoreAddAlterVnodeTypeAction(pMnode, pTrans, db, &newVgroup, pDnode) != 0) + return -1; + } + + SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup); + if (pVgRaw == NULL) return -1; + if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) { + sdbFreeRaw(pVgRaw); + return -1; + } + (void)sdbSetRawStatus(pVgRaw, SDB_STATUS_READY); + + return 0; +} + static int32_t mndAddAdjustVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) { return 0; } @@ -2437,6 +2531,13 @@ _OVER: bool mndVgroupInDb(SVgObj *pVgroup, int64_t dbUid) { return !pVgroup->isTsma && pVgroup->dbUid == dbUid; } +bool mndVgroupInDnode(SVgObj *pVgroup, int32_t dnodeId) { + for(int i = 0; i < pVgroup->replica; i++){ + if(pVgroup->vnodeGid[i].dnodeId == dnodeId) return true; + } + return false; +} + static void *mndBuildCompactVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, int64_t compactTs, STimeWindow tw) { SCompactVnodeReq compactReq = {0}; diff --git a/source/libs/tfs/src/tfs.c b/source/libs/tfs/src/tfs.c index 86b36b9b12..bedd14353f 100644 --- a/source/libs/tfs/src/tfs.c +++ b/source/libs/tfs/src/tfs.c @@ -283,6 +283,14 @@ int32_t tfsMkdir(STfs *pTfs, const char *rname) { return 0; } +bool tfsDirExistAt(STfs *pTfs, const char *rname, SDiskID diskId) { + STfsDisk *pDisk = TFS_DISK_AT(pTfs, diskId); + char aname[TMPNAME_LEN]; + + snprintf(aname, TMPNAME_LEN, "%s%s%s", pDisk->path, TD_DIRSEP, rname); + return taosDirExist(aname); +} + int32_t tfsRmdir(STfs *pTfs, const char *rname) { if (rname[0] == 0) { return 0; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index 6fc16ad4a9..f7a7ae30f4 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -335,6 +335,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_VND_NO_AVAIL_BUFPOOL, "No availabe buffer po TAOS_DEFINE_ERROR(TSDB_CODE_VND_STOPPED, "Vnode stopped") TAOS_DEFINE_ERROR(TSDB_CODE_VND_DUP_REQUEST, "Duplicate write request") TAOS_DEFINE_ERROR(TSDB_CODE_VND_QUERY_BUSY, "Query busy") +TAOS_DEFINE_ERROR(TSDB_CODE_VND_DIR_ALREADY_EXIST, "Vnode directory already exist") // tsdb TAOS_DEFINE_ERROR(TSDB_CODE_TDB_INVALID_TABLE_ID, "Invalid table ID")