From adf7ea29cbfe142391e3113d6342e508973135ce Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Tue, 29 Oct 2024 13:09:39 +0800 Subject: [PATCH] feat: alloc dnode from input dnode list --- include/common/tmsg.h | 1 + source/common/src/tmsg.c | 7 +++++ source/dnode/mnode/impl/inc/mndVgroup.h | 2 +- source/dnode/mnode/impl/src/mndDb.c | 41 ++++++++++++++++++------- source/dnode/mnode/impl/src/mndVgroup.c | 28 ++++++++++++----- source/libs/parser/inc/sql.y | 1 + source/libs/parser/src/parTranslater.c | 2 ++ 7 files changed, 63 insertions(+), 19 deletions(-) diff --git a/include/common/tmsg.h b/include/common/tmsg.h index d14facec25..8bba723bf1 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1369,6 +1369,7 @@ typedef struct { int32_t sqlLen; char* sql; int8_t withArbitrator; + char dnodeListStr[TSDB_DNODE_LIST_LEN]; } SAlterDbReq; int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq); diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 6cbf3665a7..3cc8ea4250 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4017,6 +4017,8 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { ENCODESQL(); TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->withArbitrator)); + TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dnodeListStr)); + tEndEncode(&encoder); _exit: @@ -4084,6 +4086,11 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) { if (!tDecodeIsEnd(&decoder)) { TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->withArbitrator)); } + + if (!tDecodeIsEnd(&decoder)) { + TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->dnodeListStr)); + } + tEndDecode(&decoder); _exit: diff --git a/source/dnode/mnode/impl/inc/mndVgroup.h b/source/dnode/mnode/impl/inc/mndVgroup.h index 01596094eb..a8a806e497 100644 --- a/source/dnode/mnode/impl/inc/mndVgroup.h +++ b/source/dnode/mnode/impl/inc/mndVgroup.h @@ -35,7 +35,7 @@ void mndSortVnodeGid(SVgObj *pVgroup); int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId); int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup); -SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId); +SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId, SArray *dnodeList); int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup); int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList); int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg); diff --git a/source/dnode/mnode/impl/src/mndDb.c b/source/dnode/mnode/impl/src/mndDb.c index 409e6ff848..f01f6b0a6f 100644 --- a/source/dnode/mnode/impl/src/mndDb.c +++ b/source/dnode/mnode/impl/src/mndDb.c @@ -883,12 +883,20 @@ static void mndBuildAuditDetailInt64(char *detail, char *tmp, char *format, int6 } } -static int32_t mndCheckDbDnodeList(SMnode *pMnode, SCreateDbReq *pReq, SArray *dnodeList) { - if (pReq->dnodeListStr[0] == 0) return 0; +static int32_t mndCheckDbDnodeList(SMnode *pMnode, char *db, char *dnodeListStr, SArray *dnodeList) { + if (dnodeListStr[0] == 0) return 0; - mInfo("db:%s, dnode list is %s", pReq->db, pReq->dnodeListStr); + mInfo("db:%s, dnode list is %s", db, dnodeListStr); - char *pos = pReq->dnodeListStr; + int32_t len = strlen(dnodeListStr); + for (int32_t i = 0; i < len; ++i) { + if ((dnodeListStr[i] < '0' || dnodeListStr[i] > '9') && dnodeListStr[i] != ',') { + terrno = TSDB_CODE_MND_INVALID_DNODE_LIST_FMT; + return terrno; + } + } + + char *pos = dnodeListStr; while (pos != NULL) { if (pos[0] < '0' || pos[0] > '9') { terrno = TSDB_CODE_MND_INVALID_DNODE_LIST_FMT; @@ -904,7 +912,7 @@ static int32_t mndCheckDbDnodeList(SMnode *pMnode, SCreateDbReq *pReq, SArray *d return terrno; } } else { - mError("db:%s, invalid dnode:%d from pos:%s", pReq->db, dnodeId, pos); + mError("db:%s, invalid dnode:%d from pos:%s", db, dnodeId, pos); terrno = TSDB_CODE_MND_DNODE_NOT_EXIST; return terrno; } @@ -1027,7 +1035,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) { TAOS_CHECK_GOTO(mndCheckDbEncryptKey(pMnode, &createReq), &lino, _OVER); - TAOS_CHECK_GOTO(mndCheckDbDnodeList(pMnode, &createReq, dnodeList), &lino, _OVER); + TAOS_CHECK_GOTO(mndCheckDbDnodeList(pMnode, createReq.db, createReq.dnodeListStr, dnodeList), &lino, _OVER); TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER); @@ -1218,12 +1226,13 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p TAOS_RETURN(code); } -static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) { +static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb, + SArray *dnodeList) { int32_t code = 0, lino = 0; SSdb *pSdb = pMnode->pSdb; void *pIter = NULL; SVgObj *pVgroup = NULL; - SArray *pArray = mndBuildDnodesArray(pMnode, 0); + SArray *pArray = mndBuildDnodesArray(pMnode, 0, dnodeList); while (1) { pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup); @@ -1262,7 +1271,7 @@ _err: TAOS_RETURN(code); } -static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) { +static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew, SArray *dnodeList) { int32_t code = -1; STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "alter-db"); if (pTrans == NULL) { @@ -1277,7 +1286,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER); - TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew), NULL, _OVER); + TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew, dnodeList), NULL, _OVER); TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER); code = 0; @@ -1292,6 +1301,13 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { SDbObj *pDb = NULL; SAlterDbReq alterReq = {0}; SDbObj dbObj = {0}; + SArray *dnodeList = NULL; + + dnodeList = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(int32_t)); + if (dnodeList == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _OVER; + } TAOS_CHECK_GOTO(tDeserializeSAlterDbReq(pReq->pCont, pReq->contLen, &alterReq), NULL, _OVER); @@ -1334,9 +1350,11 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) { TAOS_CHECK_GOTO(mndCheckInChangeDbCfg(pMnode, &pDb->cfg, &dbObj.cfg), NULL, _OVER); + TAOS_CHECK_GOTO(mndCheckDbDnodeList(pMnode, alterReq.db, alterReq.dnodeListStr, dnodeList), NULL, _OVER); + dbObj.cfgVersion++; dbObj.updateTime = taosGetTimestampMs(); - code = mndAlterDb(pMnode, pReq, pDb, &dbObj); + code = mndAlterDb(pMnode, pReq, pDb, &dbObj, dnodeList); if (dbObj.cfg.replications != pDb->cfg.replications) { // return quickly, operation executed asynchronously @@ -1360,6 +1378,7 @@ _OVER: mndReleaseDb(pMnode, pDb); taosArrayDestroy(dbObj.cfg.pRetensions); tFreeSAlterDbReq(&alterReq); + taosArrayDestroy(dnodeList); TAOS_RETURN(code); } diff --git a/source/dnode/mnode/impl/src/mndVgroup.c b/source/dnode/mnode/impl/src/mndVgroup.c index 2076b49cff..d8208a2d73 100644 --- a/source/dnode/mnode/impl/src/mndVgroup.c +++ b/source/dnode/mnode/impl/src/mndVgroup.c @@ -717,11 +717,25 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2 SDnodeObj *pDnode = pObj; SArray *pArray = p1; int32_t exceptDnodeId = *(int32_t *)p2; + SArray *dnodeList = p3; if (exceptDnodeId == pDnode->id) { return true; } + if (dnodeList != NULL) { + bool inDnodeList = false; + for (int32_t index = 0; index < taosArrayGetSize(dnodeList); ++index) { + int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index); + if (pDnode->id) { + inDnodeList = true; + } + } + if (!inDnodeList) { + return true; + } + } + int64_t curMs = taosGetTimestampMs(); bool online = mndIsDnodeOnline(pDnode, curMs); bool isMnode = mndIsMnode(pMnode, pDnode->id); @@ -741,7 +755,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2 return true; } -SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) { +SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) { SSdb *pSdb = pMnode->pSdb; int32_t numOfDnodes = mndGetDnodeSize(pMnode); @@ -752,7 +766,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) { } sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL); - sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL); + sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, dnodeList); mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray)); for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) { @@ -845,7 +859,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) { int32_t code = 0; - SArray *pArray = mndBuildDnodesArray(pMnode, 0); + SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL); if (pArray == NULL) { code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; @@ -879,7 +893,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray * goto _OVER; } - pArray = mndBuildDnodesArray(pMnode, 0); + pArray = mndBuildDnodesArray(pMnode, 0, dnodeList); if (pArray == NULL) { code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; @@ -2062,7 +2076,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) { int32_t code = 0; - SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId); + SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL); if (pArray == NULL) { code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; @@ -3140,7 +3154,7 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro int32_t code = -1; STrans *pTrans = NULL; SDbObj dbObj = {0}; - SArray *pArray = mndBuildDnodesArray(pMnode, 0); + SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL); int32_t numOfStreams = 0; if ((code = mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams)) != 0) { @@ -3506,7 +3520,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) { sdbRelease(pMnode->pSdb, pDnode); } - pArray = mndBuildDnodesArray(pMnode, 0); + pArray = mndBuildDnodesArray(pMnode, 0, NULL); if (pArray == NULL) { code = TSDB_CODE_MND_RETURN_VALUE_NULL; if (terrno != 0) code = terrno; diff --git a/source/libs/parser/inc/sql.y b/source/libs/parser/inc/sql.y index 635e9f570f..a9b965b4fa 100644 --- a/source/libs/parser/inc/sql.y +++ b/source/libs/parser/inc/sql.y @@ -322,6 +322,7 @@ alter_db_option(A) ::= S3_KEEPLOCAL NK_VARIABLE(B). alter_db_option(A) ::= S3_COMPACT NK_INTEGER(B). { A.type = DB_OPTION_S3_COMPACT, A.val = B; } alter_db_option(A) ::= KEEP_TIME_OFFSET NK_INTEGER(B). { A.type = DB_OPTION_KEEP_TIME_OFFSET; A.val = B; } alter_db_option(A) ::= ENCRYPT_ALGORITHM NK_STRING(B). { A.type = DB_OPTION_ENCRYPT_ALGORITHM; A.val = B; } +alter_db_option(A) ::= DNODES NK_STRING(B). { A.type = DB_OPTION_DNODES; A.val = B; } %type integer_list { SNodeList* } %destructor integer_list { nodesDestroyList($$); } diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index dd3cadc104..d6a62b2383 100755 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -8240,6 +8240,8 @@ static int32_t buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStm pReq->s3KeepLocal = pStmt->pOptions->s3KeepLocal; pReq->s3Compact = pStmt->pOptions->s3Compact; pReq->withArbitrator = pStmt->pOptions->withArbitrator; + tstrncpy(pReq->dnodeListStr, pStmt->pOptions->dnodeListStr, TSDB_DNODE_LIST_LEN); + return code; }