fix: only send message to one vnode in the vgroup
This commit is contained in:
parent
0164158256
commit
f94eaa6730
|
@ -37,7 +37,7 @@ int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray
|
||||||
|
|
||||||
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -261,8 +261,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
|
||||||
sdbRelease(pSdb, pDb);
|
sdbRelease(pSdb, pDb);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
|
||||||
bool isRedo) {
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
||||||
|
@ -279,49 +278,30 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
action.msgType = TDMT_DND_CREATE_VNODE;
|
action.msgType = TDMT_DND_CREATE_VNODE;
|
||||||
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
action.acceptableCode = TSDB_CODE_NODE_ALREADY_DEPLOYED;
|
||||||
|
|
||||||
if (isRedo) {
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid,
|
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
|
||||||
bool isRedo) {
|
|
||||||
STransAction action = {0};
|
STransAction action = {0};
|
||||||
|
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
|
||||||
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
|
|
||||||
if (pDnode == NULL) return -1;
|
|
||||||
action.epSet = mndGetDnodeEpset(pDnode);
|
|
||||||
mndReleaseDnode(pMnode, pDnode);
|
|
||||||
|
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void *pReq = mndBuildAlterVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
|
void *pReq = mndBuildAlterVnodeReq(pMnode, pDb, pVgroup, &contLen);
|
||||||
if (pReq == NULL) return -1;
|
if (pReq == NULL) return -1;
|
||||||
|
|
||||||
action.pCont = pReq;
|
action.pCont = pReq;
|
||||||
action.contLen = contLen;
|
action.contLen = contLen;
|
||||||
action.msgType = TDMT_VND_ALTER_VNODE;
|
action.msgType = TDMT_VND_ALTER_VNODE;
|
||||||
|
|
||||||
if (isRedo) {
|
|
||||||
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
taosMemoryFree(pReq);
|
taosMemoryFree(pReq);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
} else {
|
|
||||||
if (mndTransAppendUndoAction(pTrans, &action) != 0) {
|
|
||||||
taosMemoryFree(pReq);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -487,7 +467,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
|
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
||||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, true) != 0) {
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid) != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -726,12 +706,9 @@ static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *p
|
||||||
|
|
||||||
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
|
||||||
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
|
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
|
||||||
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup) != 0) {
|
||||||
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
|
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, true) != 0) {
|
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
SVgObj newVgroup = {0};
|
SVgObj newVgroup = {0};
|
||||||
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
|
||||||
|
@ -744,9 +721,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
newVgroup.replica = pDb->cfg.replications;
|
newVgroup.replica = pDb->cfg.replications;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[0], true) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
|
||||||
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
|
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
|
||||||
} else {
|
} else {
|
||||||
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
|
||||||
|
|
||||||
|
@ -757,7 +734,7 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
newVgroup.replica = pDb->cfg.replications;
|
newVgroup.replica = pDb->cfg.replications;
|
||||||
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[0], true) != 0) return -1;
|
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
|
||||||
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
|
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -986,7 +986,8 @@ static int32_t mndTransSendActionMsg(SMnode *pMnode, STrans *pTrans, SArray *pAr
|
||||||
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
memcpy(rpcMsg.pCont, pAction->pCont, pAction->contLen);
|
||||||
|
|
||||||
if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) {
|
if (tmsgSendReq(&pAction->epSet, &rpcMsg) == 0) {
|
||||||
mDebug("trans:%d, action:%d is sent", pTrans->id, action);
|
mDebug("trans:%d, action:%d is sent to %s:%u", pTrans->id, action, pAction->epSet.eps[pAction->epSet.inUse].fqdn,
|
||||||
|
pAction->epSet.eps[pAction->epSet.inUse].port);
|
||||||
pAction->msgSent = 1;
|
pAction->msgSent = 1;
|
||||||
pAction->msgReceived = 0;
|
pAction->msgReceived = 0;
|
||||||
pAction->errCode = 0;
|
pAction->errCode = 0;
|
||||||
|
|
|
@ -256,7 +256,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
|
||||||
return pReq;
|
return pReq;
|
||||||
}
|
}
|
||||||
|
|
||||||
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
|
||||||
SAlterVnodeReq alterReq = {0};
|
SAlterVnodeReq alterReq = {0};
|
||||||
alterReq.vgVersion = pVgroup->version;
|
alterReq.vgVersion = pVgroup->version;
|
||||||
alterReq.buffer = pDb->cfg.buffer;
|
alterReq.buffer = pDb->cfg.buffer;
|
||||||
|
@ -285,16 +285,14 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgO
|
||||||
pReplica->port = pVgidDnode->port;
|
pReplica->port = pVgidDnode->port;
|
||||||
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(pReplica->fqdn, pVgidDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
mndReleaseDnode(pMnode, pVgidDnode);
|
mndReleaseDnode(pMnode, pVgidDnode);
|
||||||
|
|
||||||
if (pDnode->id == pVgid->dnodeId) {
|
|
||||||
alterReq.selfIndex = v;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#if 0
|
||||||
if (alterReq.selfIndex == -1) {
|
if (alterReq.selfIndex == -1) {
|
||||||
terrno = TSDB_CODE_MND_APP_ERROR;
|
terrno = TSDB_CODE_MND_APP_ERROR;
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
|
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
|
||||||
if (contLen < 0) {
|
if (contLen < 0) {
|
||||||
|
|
Loading…
Reference in New Issue