fix/TD-30989
This commit is contained in:
parent
c819ac1abe
commit
44efc20ec3
|
@ -54,6 +54,7 @@ int32_t mndInitAcct(SMnode *pMnode) {
|
|||
void mndCleanupAcct(SMnode *pMnode) {}
|
||||
|
||||
static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
SAcctObj acctObj = {0};
|
||||
tstrncpy(acctObj.acct, TSDB_DEFAULT_USER, TSDB_USER_LEN);
|
||||
acctObj.createdTime = taosGetTimestampMs();
|
||||
|
@ -76,7 +77,10 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
|
|||
};
|
||||
|
||||
SSdbRaw *pRaw = mndAcctActionEncode(&acctObj);
|
||||
if (pRaw == NULL) return -1;
|
||||
if (pRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
mInfo("acct:%s, will be created when deploying, raw:%p", acctObj.acct, pRaw);
|
||||
|
@ -84,21 +88,24 @@ static int32_t mndCreateDefaultAcct(SMnode *pMnode) {
|
|||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_NOTHING, NULL, "create-acct");
|
||||
if (pTrans == NULL) {
|
||||
sdbFreeRaw(pRaw);
|
||||
mError("acct:%s, failed to create since %s", acctObj.acct, terrstr());
|
||||
return -1;
|
||||
code = terrno;
|
||||
mError("acct:%s, failed to create since %s", acctObj.acct, tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
mInfo("trans:%d, used to create acct:%s", pTrans->id, acctObj.acct);
|
||||
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
|
||||
code = mndTransAppendCommitlog(pTrans, pRaw);
|
||||
if (code != 0) {
|
||||
mError("trans:%d, failed to commit redo log since %s", pTrans->id, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
code = mndTransPrepare(pMnode, pTrans);
|
||||
if (code != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, tstrerror(code));
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -216,31 +223,36 @@ static int32_t mndAcctActionUpdate(SSdb *pSdb, SAcctObj *pOld, SAcctObj *pNew) {
|
|||
}
|
||||
|
||||
static int32_t mndProcessCreateAcctReq(SRpcMsg *pReq) {
|
||||
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_CREATE_ACCT) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
code = mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_CREATE_ACCT);
|
||||
if (code != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessAlterAcctReq(SRpcMsg *pReq) {
|
||||
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_ALTER_ACCT) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
code = mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_ALTER_ACCT);
|
||||
if (code != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
static int32_t mndProcessDropAcctReq(SRpcMsg *pReq) {
|
||||
if (mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_DROP_ACCT) != 0) {
|
||||
return -1;
|
||||
int32_t code = 0;
|
||||
if (code = mndCheckOperPrivilege(pReq->info.node, pReq->info.conn.user, MND_OPER_DROP_ACCT) != 0) {
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", terrstr());
|
||||
return -1;
|
||||
code = TSDB_CODE_OPS_NOT_SUPPORT;
|
||||
mError("failed to process create acct request since %s", tstrerror(code));
|
||||
TAOS_RETURN(code);
|
||||
}
|
|
@ -57,6 +57,7 @@ static int32_t mndArbCheckToken(const char *token1, const char *token2) {
|
|||
}
|
||||
|
||||
int32_t mndInitArbGroup(SMnode *pMnode) {
|
||||
int32_t code = 0;
|
||||
SSdbTable table = {
|
||||
.sdbType = SDB_ARBGROUP,
|
||||
.keyType = SDB_KEY_INT32,
|
||||
|
@ -78,6 +79,10 @@ int32_t mndInitArbGroup(SMnode *pMnode) {
|
|||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_ARBGROUP, mndCancelGetNextArbGroup);
|
||||
|
||||
arbUpdateHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_ENTRY_LOCK);
|
||||
if (arbUpdateHash == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
return sdbSetTable(pMnode->pSdb, table);
|
||||
}
|
||||
|
@ -248,50 +253,74 @@ _OVER:
|
|||
}
|
||||
|
||||
int32_t mndSetCreateArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (code = mndTransAppendRedolog(pTrans, pRedoRaw) != 0) TAOS_RETURN(code);
|
||||
if (code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetCreateArbGroupUndoLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pUndoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pUndoRaw == NULL) return -1;
|
||||
if (mndTransAppendUndolog(pTrans, pUndoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
if (pUndoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (code = mndTransAppendUndolog(pTrans, pUndoRaw) != 0) TAOS_RETURN(code);
|
||||
if (code = sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetCreateArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) TAOS_RETURN(code);
|
||||
if (code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetDropArbGroupPrepareLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (code = mndTransAppendPrepareLog(pTrans, pRedoRaw) != 0) TAOS_RETURN(code);
|
||||
if (code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t mndSetDropArbGroupRedoLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pRedoRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pRedoRaw == NULL) return -1;
|
||||
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) return -1;
|
||||
if (pRedoRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (code = mndTransAppendRedolog(pTrans, pRedoRaw) != 0) TAOS_RETURN(code);
|
||||
if (code = sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t mndSetDropArbGroupCommitLogs(STrans *pTrans, SArbGroup *pGroup) {
|
||||
int32_t code = 0;
|
||||
SSdbRaw *pCommitRaw = mndArbGroupActionEncode(pGroup);
|
||||
if (pCommitRaw == NULL) return -1;
|
||||
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
|
||||
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) return -1;
|
||||
if (pCommitRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
if (code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) TAOS_RETURN(code);
|
||||
if (code = sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED) != 0) TAOS_RETURN(code);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -343,6 +372,7 @@ static int32_t mndSendArbHeartBeatReq(SDnodeObj *pDnode, char *arbToken, int64_t
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SArbGroup *pArbGroup = NULL;
|
||||
|
@ -377,7 +407,7 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if (code = mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
mError("failed to get arb token for arb-hb timer");
|
||||
pIter = taosHashIterate(pDnodeHash, NULL);
|
||||
while (pIter) {
|
||||
|
@ -386,7 +416,7 @@ static int32_t mndProcessArbHbTimer(SRpcMsg *pReq) {
|
|||
pIter = taosHashIterate(pDnodeHash, pIter);
|
||||
}
|
||||
taosHashCleanup(pDnodeHash);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int64_t nowMs = taosGetTimestampMs();
|
||||
|
@ -447,6 +477,7 @@ static void *mndBuildArbCheckSyncReq(int32_t *pContLen, int32_t vgId, char *arbT
|
|||
|
||||
static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbToken, int64_t term, char *member0Token,
|
||||
char *member1Token) {
|
||||
int32_t code = 0;
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbCheckSyncReq(&contLen, vgId, arbToken, term, member0Token, member1Token);
|
||||
if (!pHead) {
|
||||
|
@ -459,10 +490,12 @@ static int32_t mndSendArbCheckSyncReq(SMnode *pMnode, int32_t vgId, char *arbTok
|
|||
if (epSet.numOfEps == 0) {
|
||||
mError("vgId:%d, failed to send check-sync request since no epSet found", vgId);
|
||||
rpcFreeCont(pHead);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
mError("vgId:%d, failed to send check-sync request since 0x%x", vgId, code);
|
||||
} else {
|
||||
|
@ -502,11 +535,14 @@ static void *mndBuildArbSetAssignedLeaderReq(int32_t *pContLen, int32_t vgId, ch
|
|||
|
||||
static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, int32_t vgId, char *arbToken,
|
||||
int64_t term, char *memberToken) {
|
||||
int32_t code = 0;
|
||||
int32_t contLen = 0;
|
||||
void *pHead = mndBuildArbSetAssignedLeaderReq(&contLen, vgId, arbToken, term, memberToken);
|
||||
if (!pHead) {
|
||||
mError("vgId:%d, failed to build set-assigned request", vgId);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
SRpcMsg rpcMsg = {.msgType = TDMT_SYNC_SET_ASSIGNED_LEADER, .pCont = pHead, .contLen = contLen};
|
||||
|
||||
|
@ -514,9 +550,11 @@ static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, i
|
|||
if (epSet.numOfEps == 0) {
|
||||
mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since no epSet found", dnodeId, vgId);
|
||||
rpcFreeCont(pHead);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int32_t code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
code = tmsgSendReq(&epSet, &rpcMsg);
|
||||
if (code != 0) {
|
||||
mError("dnodeId:%d vgId:%d, failed to send arb-set-assigned request to dnode since 0x%x", dnodeId, vgId, code);
|
||||
} else {
|
||||
|
@ -526,6 +564,7 @@ static int32_t mndSendArbSetAssignedLeaderReq(SMnode *pMnode, int32_t dnodeId, i
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
||||
int32_t code = 0;
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
SArbGroup *pArbGroup = NULL;
|
||||
|
@ -533,14 +572,16 @@ static int32_t mndProcessArbCheckSyncTimer(SRpcMsg *pReq) {
|
|||
void *pIter = NULL;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if (code = mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
mError("failed to get arb token for arb-check-sync timer");
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
int64_t term = mndGetTerm(pMnode);
|
||||
if (term < 0) {
|
||||
mError("arb failed to get term since %s", terrstr());
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int64_t roleTimeMs = mndGetRoleTimeMs(pMnode);
|
||||
|
@ -747,13 +788,13 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
||||
int ret = -1;
|
||||
int code = -1;
|
||||
size_t sz = 0;
|
||||
|
||||
SMArbUpdateGroupBatchReq req = {0};
|
||||
if (tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) {
|
||||
if (code = tDeserializeSMArbUpdateGroupBatchReq(pReq->pCont, pReq->contLen, &req) != 0) {
|
||||
mError("arb failed to decode arb-update-group request");
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SMnode *pMnode = pReq->info.node;
|
||||
|
@ -789,7 +830,7 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
|
||||
mndTransAddArbGroupId(pTrans, newGroup.vgId);
|
||||
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) {
|
||||
if (code = mndSetCreateArbGroupCommitLogs(pTrans, &newGroup) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, trans:%d, since %s", newGroup.vgId, pTrans->id,
|
||||
terrstr());
|
||||
goto _OVER;
|
||||
|
@ -803,13 +844,13 @@ static int32_t mndProcessArbUpdateGroupBatchReq(SRpcMsg *pReq) {
|
|||
sdbRelease(pMnode->pSdb, pOldGroup);
|
||||
}
|
||||
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (code = mndTransCheckConflict(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (code = mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
if (ret != 0) {
|
||||
if (code != 0) {
|
||||
// failed to update arbgroup
|
||||
for (size_t i = 0; i < sz; i++) {
|
||||
SMArbUpdateGroup *pUpdateGroup = taosArrayGet(req.updateArray, i);
|
||||
|
@ -819,7 +860,7 @@ _OVER:
|
|||
|
||||
mndTransDrop(pTrans);
|
||||
tFreeSMArbUpdateGroupBatchReq(&req);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
static void mndArbGroupDupObj(SArbGroup *pGroup, SArbGroup *pNew) {
|
||||
|
@ -841,10 +882,11 @@ static void mndArbGroupResetAssignedLeader(SArbGroup *pGroup) {
|
|||
}
|
||||
|
||||
static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_ARBGROUP, NULL, "update-arbgroup");
|
||||
if (pTrans == NULL) {
|
||||
mError("failed to update arbgroup in create trans, vgId:%d, since %s", pNew->vgId, terrstr());
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -854,23 +896,22 @@ static int32_t mndArbGroupUpdateTrans(SMnode *pMnode, SArbGroup *pNew) {
|
|||
pNew->assignedLeader.token, pNew->assignedLeader.acked);
|
||||
|
||||
mndTransAddArbGroupId(pTrans, pNew->vgId);
|
||||
if (mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
ret = -1;
|
||||
if (code = mndTransCheckConflict(pMnode, pTrans) != 0) {
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndSetCreateArbGroupCommitLogs(pTrans, pNew) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, terrstr());
|
||||
if (code = mndSetCreateArbGroupCommitLogs(pTrans, pNew) != 0) {
|
||||
mError("failed to update arbgroup in set commit log, vgId:%d, since %s", pNew->vgId, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
if (code = mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
mndTransDrop(pTrans);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
bool mndUpdateArbGroupByHeartBeat(SArbGroup *pGroup, SVArbHbRspMember *pRspMember, int64_t nowMs, int32_t dnodeId,
|
||||
|
@ -993,11 +1034,13 @@ _OVER:
|
|||
}
|
||||
|
||||
static int32_t mndUpdateArbSync(SMnode *pMnode, int32_t vgId, char *member0Token, char *member1Token, bool newIsSync) {
|
||||
int32_t code = 0;
|
||||
SArbGroup *pGroup = sdbAcquire(pMnode->pSdb, SDB_ARBGROUP, &vgId);
|
||||
if (pGroup == NULL) {
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
mInfo("failed to update arb sync, vgId:%d not found", vgId);
|
||||
return -1;
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SArbGroup newGroup = {0};
|
||||
|
@ -1018,38 +1061,36 @@ static int32_t mndProcessArbHbRsp(SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if (code = mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
mError("failed to get arb token for arb-hb response");
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SVArbHeartBeatRsp arbHbRsp = {0};
|
||||
if (tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) {
|
||||
if (code = tDeserializeSVArbHeartBeatRsp(pRsp->pCont, pRsp->contLen, &arbHbRsp) != 0) {
|
||||
mInfo("arb hb-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndArbCheckToken(arbToken, arbHbRsp.arbToken) != 0) {
|
||||
mInfo("arb hearbeat skip update for dnodeId:%d, arb token mismatch, local:[%s] msg:[%s]", arbHbRsp.dnodeId,
|
||||
arbToken, arbHbRsp.arbToken);
|
||||
terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
(void)mndUpdateArbHeartBeat(pMnode, arbHbRsp.dnodeId, arbHbRsp.hbMembers);
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
tFreeSVArbHeartBeatRsp(&arbHbRsp);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
||||
|
@ -1058,27 +1099,25 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if (code = mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
mError("failed to get arb token for arb-check-sync response");
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SVArbCheckSyncRsp syncRsp = {0};
|
||||
if (tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
|
||||
if (code = tDeserializeSVArbCheckSyncRsp(pRsp->pCont, pRsp->contLen, &syncRsp) != 0) {
|
||||
mInfo("arb check-sync-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
if (pRsp->code == TSDB_CODE_MND_ARB_TOKEN_MISMATCH) {
|
||||
terrno = TSDB_CODE_SUCCESS;
|
||||
return 0;
|
||||
}
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndArbCheckToken(arbToken, syncRsp.arbToken) != 0) {
|
||||
|
@ -1089,16 +1128,16 @@ static int32_t mndProcessArbCheckSyncRsp(SRpcMsg *pRsp) {
|
|||
}
|
||||
|
||||
bool newIsSync = (syncRsp.errCode == TSDB_CODE_SUCCESS);
|
||||
if (mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync) != 0) {
|
||||
if (code = mndUpdateArbSync(pMnode, syncRsp.vgId, syncRsp.member0Token, syncRsp.member1Token, newIsSync) != 0) {
|
||||
mInfo("failed to update arb sync for vgId:%d, since:%s", syncRsp.vgId, terrstr());
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
tFreeSVArbCheckSyncRsp(&syncRsp);
|
||||
return ret;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
bool mndUpdateArbGroupBySetAssignedLeader(SArbGroup *pGroup, int32_t vgId, char *memberToken, int32_t errcode,
|
||||
|
@ -1138,35 +1177,35 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t ret = -1;
|
||||
int32_t code = -1;
|
||||
|
||||
SMnode *pMnode = pRsp->info.node;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
char arbToken[TSDB_ARB_TOKEN_SIZE];
|
||||
if (mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
if (code = mndGetArbToken(pMnode, arbToken) != 0) {
|
||||
mError("failed to get arb token for arb-set-assigned response");
|
||||
terrno = TSDB_CODE_NOT_FOUND;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SVArbSetAssignedLeaderRsp setAssignedRsp = {0};
|
||||
if (tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) {
|
||||
if (code = tDeserializeSVArbSetAssignedLeaderRsp(pRsp->pCont, pRsp->contLen, &setAssignedRsp) != 0) {
|
||||
mInfo("arb set-assigned-rsp des failed, since:%s", tstrerror(pRsp->code));
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (mndArbCheckToken(arbToken, setAssignedRsp.arbToken) != 0) {
|
||||
mInfo("skip update arb assigned for vgId:%d, arb token mismatch, local:[%s] msg:[%s]", setAssignedRsp.vgId,
|
||||
arbToken, setAssignedRsp.arbToken);
|
||||
terrno = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
code = TSDB_CODE_MND_ARB_TOKEN_MISMATCH;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
SArbGroup *pGroup = mndAcquireArbGroup(pMnode, setAssignedRsp.vgId);
|
||||
if (!pGroup) {
|
||||
mError("failed to set arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
|
||||
code = -1;
|
||||
if (terrno != 0) code = terrno;
|
||||
goto _OVER;
|
||||
}
|
||||
|
||||
|
@ -1174,17 +1213,17 @@ static int32_t mndProcessArbSetAssignedLeaderRsp(SRpcMsg *pRsp) {
|
|||
bool updateAssigned = mndUpdateArbGroupBySetAssignedLeader(pGroup, setAssignedRsp.vgId, setAssignedRsp.memberToken,
|
||||
pRsp->code, &newGroup);
|
||||
if (updateAssigned) {
|
||||
if (mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, terrstr());
|
||||
if (code = mndPullupArbUpdateGroup(pMnode, &newGroup) != 0) {
|
||||
mInfo("failed to pullup update arb assigned for vgId:%d, since:%s", setAssignedRsp.vgId, tstrerror(code));
|
||||
goto _OVER;
|
||||
}
|
||||
}
|
||||
|
||||
ret = 0;
|
||||
code = 0;
|
||||
|
||||
_OVER:
|
||||
tFreeSVArbSetAssignedLeaderRsp(&setAssignedRsp);
|
||||
return ret;
|
||||
return code;
|
||||
}
|
||||
|
||||
static int32_t mndRetrieveArbGroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
|
||||
|
|
|
@ -14,8 +14,8 @@
|
|||
*/
|
||||
|
||||
#define _DEFAULT_SOURCE
|
||||
#include "audit.h"
|
||||
#include "mndCluster.h"
|
||||
#include "audit.h"
|
||||
#include "mndGrant.h"
|
||||
#include "mndPrivilege.h"
|
||||
#include "mndShow.h"
|
||||
|
@ -61,11 +61,13 @@ int32_t mndInitCluster(SMnode *pMnode) {
|
|||
void mndCleanupCluster(SMnode *pMnode) {}
|
||||
|
||||
int32_t mndGetClusterName(SMnode *pMnode, char *clusterName, int32_t len) {
|
||||
int32_t code = 0;
|
||||
SSdb *pSdb = pMnode->pSdb;
|
||||
|
||||
SClusterObj *pCluster = sdbAcquire(pSdb, SDB_CLUSTER, &pMnode->clusterId);
|
||||
if (pCluster == NULL) {
|
||||
return -1;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
tstrncpy(clusterName, pCluster->name, len);
|
||||
|
@ -247,7 +249,10 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
mInfo("cluster:%" PRId64 ", name is %s", clusterObj.id, clusterObj.name);
|
||||
|
||||
SSdbRaw *pRaw = mndClusterActionEncode(&clusterObj);
|
||||
if (pRaw == NULL) return -1;
|
||||
if (pRaw == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
mInfo("cluster:%" PRId64 ", will be created when deploying, raw:%p", clusterObj.id, pRaw);
|
||||
|
@ -256,21 +261,22 @@ static int32_t mndCreateDefaultCluster(SMnode *pMnode) {
|
|||
if (pTrans == NULL) {
|
||||
sdbFreeRaw(pRaw);
|
||||
mError("cluster:%" PRId64 ", failed to create since %s", clusterObj.id, terrstr());
|
||||
return -1;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
mInfo("trans:%d, used to create cluster:%" PRId64, pTrans->id, clusterObj.id);
|
||||
|
||||
if (mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||
if (code = mndTransAppendCommitlog(pTrans, pRaw) != 0) {
|
||||
mError("trans:%d, failed to commit redo log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
if (code = mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -347,22 +353,33 @@ static int32_t mndProcessUptimeTimer(SRpcMsg *pReq) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t code = 0;
|
||||
mInfo("update cluster uptime to %d", clusterObj.upTime);
|
||||
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "update-uptime");
|
||||
if (pTrans == NULL) return -1;
|
||||
if (pTrans == NULL) {
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
SSdbRaw *pCommitRaw = mndClusterActionEncode(&clusterObj);
|
||||
if (pCommitRaw == NULL || mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
if (pCommitRaw == NULL) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
code = terrno;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
if (code = mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
|
||||
mError("trans:%d, failed to append commit log since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
(void)sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
|
||||
|
||||
if (mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
if (code = mndTransPrepare(pMnode, pTrans) != 0) {
|
||||
mError("trans:%d, failed to prepare since %s", pTrans->id, terrstr());
|
||||
mndTransDrop(pTrans);
|
||||
return -1;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mndTransDrop(pTrans);
|
||||
|
@ -374,13 +391,12 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
|
|||
SMnode *pMnode = pReq->info.node;
|
||||
SMCfgClusterReq cfgReq = {0};
|
||||
if (tDeserializeSMCfgClusterReq(pReq->pCont, pReq->contLen, &cfgReq) != 0) {
|
||||
terrno = TSDB_CODE_INVALID_MSG;
|
||||
return -1;
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
mInfo("cluster: start to config, option:%s, value:%s", cfgReq.config, cfgReq.value);
|
||||
if (mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_CLUSTER) != 0) {
|
||||
code = terrno != 0 ? terrno : TSDB_CODE_MND_NO_RIGHTS;
|
||||
if (code = mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CONFIG_CLUSTER) != 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
|
@ -410,17 +426,17 @@ int32_t mndProcessConfigClusterReq(SRpcMsg *pReq) {
|
|||
}
|
||||
|
||||
{ // audit
|
||||
auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql, TMIN(cfgReq.sqlLen, GRANT_ACTIVE_HEAD_LEN << 1));
|
||||
auditRecord(pReq, pMnode->clusterId, "alterCluster", "", "", cfgReq.sql,
|
||||
TMIN(cfgReq.sqlLen, GRANT_ACTIVE_HEAD_LEN << 1));
|
||||
}
|
||||
_exit:
|
||||
tFreeSMCfgClusterReq(&cfgReq);
|
||||
if (code != 0) {
|
||||
terrno = code;
|
||||
mError("cluster: failed to config:%s %s since %s", cfgReq.config, cfgReq.value, terrstr());
|
||||
} else {
|
||||
mInfo("cluster: success to config:%s %s", cfgReq.config, cfgReq.value);
|
||||
}
|
||||
return code;
|
||||
TAOS_RETURN(code);
|
||||
}
|
||||
|
||||
int32_t mndProcessConfigClusterRsp(SRpcMsg *pRsp) {
|
||||
|
|
Loading…
Reference in New Issue