feat: alloc dnode from input dnode list
This commit is contained in:
parent
59f00727e5
commit
adf7ea29cb
|
@ -1369,6 +1369,7 @@ typedef struct {
|
|||
int32_t sqlLen;
|
||||
char* sql;
|
||||
int8_t withArbitrator;
|
||||
char dnodeListStr[TSDB_DNODE_LIST_LEN];
|
||||
} SAlterDbReq;
|
||||
|
||||
int32_t tSerializeSAlterDbReq(void* buf, int32_t bufLen, SAlterDbReq* pReq);
|
||||
|
|
|
@ -4017,6 +4017,8 @@ int32_t tSerializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
|
||||
ENCODESQL();
|
||||
TAOS_CHECK_EXIT(tEncodeI8(&encoder, pReq->withArbitrator));
|
||||
TAOS_CHECK_EXIT(tEncodeCStr(&encoder, pReq->dnodeListStr));
|
||||
|
||||
tEndEncode(&encoder);
|
||||
|
||||
_exit:
|
||||
|
@ -4084,6 +4086,11 @@ int32_t tDeserializeSAlterDbReq(void *buf, int32_t bufLen, SAlterDbReq *pReq) {
|
|||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeI8(&decoder, &pReq->withArbitrator));
|
||||
}
|
||||
|
||||
if (!tDecodeIsEnd(&decoder)) {
|
||||
TAOS_CHECK_EXIT(tDecodeCStrTo(&decoder, pReq->dnodeListStr));
|
||||
}
|
||||
|
||||
tEndDecode(&decoder);
|
||||
|
||||
_exit:
|
||||
|
|
|
@ -35,7 +35,7 @@ void mndSortVnodeGid(SVgObj *pVgroup);
|
|||
int64_t mndGetVnodesMemory(SMnode *pMnode, int32_t dnodeId);
|
||||
int64_t mndGetVgroupMemory(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup);
|
||||
|
||||
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId);
|
||||
SArray *mndBuildDnodesArray(SMnode *, int32_t exceptDnodeId, SArray *dnodeList);
|
||||
int32_t mndAllocSmaVgroup(SMnode *, SDbObj *pDb, SVgObj *pVgroup);
|
||||
int32_t mndAllocVgroup(SMnode *, SDbObj *pDb, SVgObj **ppVgroups, SArray *dnodeList);
|
||||
int32_t mndAddNewVgPrepareAction(SMnode *, STrans *pTrans, SVgObj *pVg);
|
||||
|
|
|
@ -883,12 +883,20 @@ static void mndBuildAuditDetailInt64(char *detail, char *tmp, char *format, int6
|
|||
}
|
||||
}
|
||||
|
||||
static int32_t mndCheckDbDnodeList(SMnode *pMnode, SCreateDbReq *pReq, SArray *dnodeList) {
|
||||
if (pReq->dnodeListStr[0] == 0) return 0;
|
||||
static int32_t mndCheckDbDnodeList(SMnode *pMnode, char *db, char *dnodeListStr, SArray *dnodeList) {
|
||||
if (dnodeListStr[0] == 0) return 0;
|
||||
|
||||
mInfo("db:%s, dnode list is %s", pReq->db, pReq->dnodeListStr);
|
||||
mInfo("db:%s, dnode list is %s", db, dnodeListStr);
|
||||
|
||||
char *pos = pReq->dnodeListStr;
|
||||
int32_t len = strlen(dnodeListStr);
|
||||
for (int32_t i = 0; i < len; ++i) {
|
||||
if ((dnodeListStr[i] < '0' || dnodeListStr[i] > '9') && dnodeListStr[i] != ',') {
|
||||
terrno = TSDB_CODE_MND_INVALID_DNODE_LIST_FMT;
|
||||
return terrno;
|
||||
}
|
||||
}
|
||||
|
||||
char *pos = dnodeListStr;
|
||||
while (pos != NULL) {
|
||||
if (pos[0] < '0' || pos[0] > '9') {
|
||||
terrno = TSDB_CODE_MND_INVALID_DNODE_LIST_FMT;
|
||||
|
@ -904,7 +912,7 @@ static int32_t mndCheckDbDnodeList(SMnode *pMnode, SCreateDbReq *pReq, SArray *d
|
|||
return terrno;
|
||||
}
|
||||
} else {
|
||||
mError("db:%s, invalid dnode:%d from pos:%s", pReq->db, dnodeId, pos);
|
||||
mError("db:%s, invalid dnode:%d from pos:%s", db, dnodeId, pos);
|
||||
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
|
||||
return terrno;
|
||||
}
|
||||
|
@ -1027,7 +1035,7 @@ static int32_t mndProcessCreateDbReq(SRpcMsg *pReq) {
|
|||
|
||||
TAOS_CHECK_GOTO(mndCheckDbEncryptKey(pMnode, &createReq), &lino, _OVER);
|
||||
|
||||
TAOS_CHECK_GOTO(mndCheckDbDnodeList(pMnode, &createReq, dnodeList), &lino, _OVER);
|
||||
TAOS_CHECK_GOTO(mndCheckDbDnodeList(pMnode, createReq.db, createReq.dnodeListStr, dnodeList), &lino, _OVER);
|
||||
|
||||
TAOS_CHECK_GOTO(mndAcquireUser(pMnode, pReq->info.conn.user, &pUser), &lino, _OVER);
|
||||
|
||||
|
@ -1218,12 +1226,13 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb) {
|
||||
static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *pOldDb, SDbObj *pNewDb,
|
||||
SArray *dnodeList) {
|
||||
int32_t code = 0, lino = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
void *pIter = NULL;
|
||||
SVgObj *pVgroup = NULL;
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
|
||||
|
||||
while (1) {
|
||||
pIter = sdbFetch(pSdb, SDB_VGROUP, pIter, (void **)&pVgroup);
|
||||
|
@ -1262,7 +1271,7 @@ _err:
|
|||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
|
||||
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew, SArray *dnodeList) {
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq, "alter-db");
|
||||
if (pTrans == NULL) {
|
||||
|
@ -1277,7 +1286,7 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
|
|||
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbPrepareLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew, dnodeList), NULL, _OVER);
|
||||
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
|
||||
code = 0;
|
||||
|
||||
|
@ -1292,6 +1301,13 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
|||
SDbObj *pDb = NULL;
|
||||
SAlterDbReq alterReq = {0};
|
||||
SDbObj dbObj = {0};
|
||||
SArray *dnodeList = NULL;
|
||||
|
||||
dnodeList = taosArrayInit(mndGetDnodeSize(pMnode), sizeof(int32_t));
|
||||
if (dnodeList == NULL) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
TAOS_CHECK_GOTO(tDeserializeSAlterDbReq(pReq->pCont, pReq->contLen, &alterReq), NULL, _OVER);
|
||||
|
||||
|
@ -1334,9 +1350,11 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
|
|||
|
||||
TAOS_CHECK_GOTO(mndCheckInChangeDbCfg(pMnode, &pDb->cfg, &dbObj.cfg), NULL, _OVER);
|
||||
|
||||
TAOS_CHECK_GOTO(mndCheckDbDnodeList(pMnode, alterReq.db, alterReq.dnodeListStr, dnodeList), NULL, _OVER);
|
||||
|
||||
dbObj.cfgVersion++;
|
||||
dbObj.updateTime = taosGetTimestampMs();
|
||||
code = mndAlterDb(pMnode, pReq, pDb, &dbObj);
|
||||
code = mndAlterDb(pMnode, pReq, pDb, &dbObj, dnodeList);
|
||||
|
||||
if (dbObj.cfg.replications != pDb->cfg.replications) {
|
||||
// return quickly, operation executed asynchronously
|
||||
|
@ -1360,6 +1378,7 @@ _OVER:
|
|||
mndReleaseDb(pMnode, pDb);
|
||||
taosArrayDestroy(dbObj.cfg.pRetensions);
|
||||
tFreeSAlterDbReq(&alterReq);
|
||||
taosArrayDestroy(dnodeList);
|
||||
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
|
|
@ -717,11 +717,25 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
|
|||
SDnodeObj *pDnode = pObj;
|
||||
SArray *pArray = p1;
|
||||
int32_t exceptDnodeId = *(int32_t *)p2;
|
||||
SArray *dnodeList = p3;
|
||||
|
||||
if (exceptDnodeId == pDnode->id) {
|
||||
return true;
|
||||
}
|
||||
|
||||
if (dnodeList != NULL) {
|
||||
bool inDnodeList = false;
|
||||
for (int32_t index = 0; index < taosArrayGetSize(dnodeList); ++index) {
|
||||
int32_t dnodeId = *(int32_t *)taosArrayGet(dnodeList, index);
|
||||
if (pDnode->id) {
|
||||
inDnodeList = true;
|
||||
}
|
||||
}
|
||||
if (!inDnodeList) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
int64_t curMs = taosGetTimestampMs();
|
||||
bool online = mndIsDnodeOnline(pDnode, curMs);
|
||||
bool isMnode = mndIsMnode(pMnode, pDnode->id);
|
||||
|
@ -741,7 +755,7 @@ static bool mndBuildDnodesArrayFp(SMnode *pMnode, void *pObj, void *p1, void *p2
|
|||
return true;
|
||||
}
|
||||
|
||||
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
|
||||
SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId, SArray *dnodeList) {
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
int32_t numOfDnodes = mndGetDnodeSize(pMnode);
|
||||
|
||||
|
@ -752,7 +766,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode, int32_t exceptDnodeId) {
|
|||
}
|
||||
|
||||
sdbTraverse(pSdb, SDB_DNODE, mndResetDnodesArrayFp, NULL, NULL, NULL);
|
||||
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, NULL);
|
||||
sdbTraverse(pSdb, SDB_DNODE, mndBuildDnodesArrayFp, pArray, &exceptDnodeId, dnodeList);
|
||||
|
||||
mDebug("build %d dnodes array", (int32_t)taosArrayGetSize(pArray));
|
||||
for (int32_t i = 0; i < (int32_t)taosArrayGetSize(pArray); ++i) {
|
||||
|
@ -845,7 +859,7 @@ static int32_t mndGetAvailableDnode(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup
|
|||
|
||||
int32_t mndAllocSmaVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup) {
|
||||
int32_t code = 0;
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
|
||||
if (pArray == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
|
@ -879,7 +893,7 @@ int32_t mndAllocVgroup(SMnode *pMnode, SDbObj *pDb, SVgObj **ppVgroups, SArray *
|
|||
goto _OVER;
|
||||
}
|
||||
|
||||
pArray = mndBuildDnodesArray(pMnode, 0);
|
||||
pArray = mndBuildDnodesArray(pMnode, 0, dnodeList);
|
||||
if (pArray == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
|
@ -2062,7 +2076,7 @@ int32_t mndSetMoveVgroupInfoToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb,
|
|||
|
||||
int32_t mndSetMoveVgroupsInfoToTrans(SMnode *pMnode, STrans *pTrans, int32_t delDnodeId, bool force, bool unsafe) {
|
||||
int32_t code = 0;
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId);
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, delDnodeId, NULL);
|
||||
if (pArray == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
|
@ -3140,7 +3154,7 @@ int32_t mndSplitVgroup(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SVgObj *pVgro
|
|||
int32_t code = -1;
|
||||
STrans *pTrans = NULL;
|
||||
SDbObj dbObj = {0};
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0);
|
||||
SArray *pArray = mndBuildDnodesArray(pMnode, 0, NULL);
|
||||
|
||||
int32_t numOfStreams = 0;
|
||||
if ((code = mndGetNumOfStreams(pMnode, pDb->name, &numOfStreams)) != 0) {
|
||||
|
@ -3506,7 +3520,7 @@ static int32_t mndProcessBalanceVgroupMsg(SRpcMsg *pReq) {
|
|||
sdbRelease(pMnode->pSdb, pDnode);
|
||||
}
|
||||
|
||||
pArray = mndBuildDnodesArray(pMnode, 0);
|
||||
pArray = mndBuildDnodesArray(pMnode, 0, NULL);
|
||||
if (pArray == NULL) {
|
||||
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
|
||||
if (terrno != 0) code = terrno;
|
||||
|
|
|
@ -322,6 +322,7 @@ alter_db_option(A) ::= S3_KEEPLOCAL NK_VARIABLE(B).
|
|||
alter_db_option(A) ::= S3_COMPACT NK_INTEGER(B). { A.type = DB_OPTION_S3_COMPACT, A.val = B; }
|
||||
alter_db_option(A) ::= KEEP_TIME_OFFSET NK_INTEGER(B). { A.type = DB_OPTION_KEEP_TIME_OFFSET; A.val = B; }
|
||||
alter_db_option(A) ::= ENCRYPT_ALGORITHM NK_STRING(B). { A.type = DB_OPTION_ENCRYPT_ALGORITHM; A.val = B; }
|
||||
alter_db_option(A) ::= DNODES NK_STRING(B). { A.type = DB_OPTION_DNODES; A.val = B; }
|
||||
|
||||
%type integer_list { SNodeList* }
|
||||
%destructor integer_list { nodesDestroyList($$); }
|
||||
|
|
|
@ -8240,6 +8240,8 @@ static int32_t buildAlterDbReq(STranslateContext* pCxt, SAlterDatabaseStmt* pStm
|
|||
pReq->s3KeepLocal = pStmt->pOptions->s3KeepLocal;
|
||||
pReq->s3Compact = pStmt->pOptions->s3Compact;
|
||||
pReq->withArbitrator = pStmt->pOptions->withArbitrator;
|
||||
tstrncpy(pReq->dnodeListStr, pStmt->pOptions->dnodeListStr, TSDB_DNODE_LIST_LEN);
|
||||
|
||||
return code;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue