Merge branch '3.0' of https://github.com/taosdata/TDengine into feat/tsdb_refact
This commit is contained in:
commit
e6e629ae84
|
@ -1294,6 +1294,14 @@ typedef struct {
|
||||||
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
int32_t tSerializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||||||
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
int32_t tDeserializeSDCreateMnodeReq(void* buf, int32_t bufLen, SDCreateMnodeReq* pReq);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t dnodeId;
|
||||||
|
int8_t standby;
|
||||||
|
} SSetStandbyReq;
|
||||||
|
|
||||||
|
int32_t tSerializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
|
||||||
|
int32_t tDeserializeSSetStandbyReq(void* buf, int32_t bufLen, SSetStandbyReq* pReq);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t connId;
|
int32_t connId;
|
||||||
int32_t queryId;
|
int32_t queryId;
|
||||||
|
|
|
@ -97,6 +97,7 @@ enum {
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_MNODE, "create-mnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_MNODE, "alter-mnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_MNODE, "drop-mnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_MNODE, "drop-mnode", NULL, NULL)
|
||||||
|
TD_DEF_MSG_TYPE(TDMT_MND_SET_STANDBY, "set-mnode-standby", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_QNODE, "create-qnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_QNODE, "alter-qnode", NULL, NULL)
|
||||||
TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL)
|
TD_DEF_MSG_TYPE(TDMT_MND_DROP_QNODE, "drop-qnode", NULL, NULL)
|
||||||
|
|
|
@ -3507,6 +3507,33 @@ int32_t tDeserializeSDCreateMnodeReq(void *buf, int32_t bufLen, SDCreateMnodeReq
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t tSerializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
|
||||||
|
SEncoder encoder = {0};
|
||||||
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartEncode(&encoder) < 0) return -1;
|
||||||
|
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
|
||||||
|
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
|
||||||
|
tEndEncode(&encoder);
|
||||||
|
|
||||||
|
int32_t tlen = encoder.pos;
|
||||||
|
tEncoderClear(&encoder);
|
||||||
|
return tlen;
|
||||||
|
}
|
||||||
|
|
||||||
|
int32_t tDeserializeSSetStandbyReq(void *buf, int32_t bufLen, SSetStandbyReq *pReq) {
|
||||||
|
SDecoder decoder = {0};
|
||||||
|
tDecoderInit(&decoder, buf, bufLen);
|
||||||
|
|
||||||
|
if (tStartDecode(&decoder) < 0) return -1;
|
||||||
|
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
|
||||||
|
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
|
||||||
|
tEndDecode(&decoder);
|
||||||
|
|
||||||
|
tDecoderClear(&decoder);
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
int32_t tSerializeSAuthReq(void *buf, int32_t bufLen, SAuthReq *pReq) {
|
||||||
SEncoder encoder = {0};
|
SEncoder encoder = {0};
|
||||||
tEncoderInit(&encoder, buf, bufLen);
|
tEncoderInit(&encoder, buf, bufLen);
|
||||||
|
|
|
@ -155,6 +155,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_MNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_MNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_QNODE, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
|
||||||
|
@ -235,6 +236,7 @@ SArray *mmGetMsgHandles() {
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_APPEND_ENTRIES_REPLY, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_SEND, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
if (dmSetMgmtHandle(pArray, TDMT_SYNC_SNAPSHOT_RSP, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
|
||||||
|
if (dmSetMgmtHandle(pArray, TDMT_MND_SET_STANDBY, mmPutNodeMsgToSyncQueue, 0) == NULL) goto _OVER;
|
||||||
|
|
||||||
code = 0;
|
code = 0;
|
||||||
|
|
||||||
|
|
|
@ -428,7 +428,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnRequestVoteSnapshotCb(pSyncNode, pSyncMsg);
|
||||||
|
@ -445,7 +444,6 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnAppendEntriesReplySnapshotCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
|
||||||
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
|
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnSnapshotSendCb(pSyncNode, pSyncMsg);
|
||||||
|
@ -454,12 +452,14 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnSnapshotRspCb(pSyncNode, pSyncMsg);
|
||||||
syncSnapshotRspDestroy(pSyncMsg);
|
syncSnapshotRspDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_MND_SET_STANDBY) {
|
||||||
|
code = syncSetStandby(pMgmt->sync);
|
||||||
|
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||||
|
@ -493,6 +493,10 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
} else if (pMsg->msgType == TDMT_MND_SET_STANDBY) {
|
||||||
|
code = syncSetStandby(pMgmt->sync);
|
||||||
|
SRpcMsg rsp = {.code = code, .info = pMsg->info};
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
} else {
|
} else {
|
||||||
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
mError("failed to process msg:%p since invalid type:%s", pMsg, TMSG_INFO(pMsg->msgType));
|
||||||
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
code = TAOS_SYNC_PROPOSE_OTHER_ERROR;
|
||||||
|
|
|
@ -55,6 +55,7 @@ int32_t mndInitMnode(SMnode *pMnode) {
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_MND_ALTER_MNODE_RSP, mndTransProcessRsp);
|
||||||
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
|
mndSetMsgHandle(pMnode, TDMT_MND_DROP_MNODE, mndProcessDropMnodeReq);
|
||||||
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
|
mndSetMsgHandle(pMnode, TDMT_DND_DROP_MNODE_RSP, mndTransProcessRsp);
|
||||||
|
mndSetMsgHandle(pMnode, TDMT_MND_SET_STANDBY_RSP, mndTransProcessRsp);
|
||||||
|
|
||||||
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndRetrieveMnodes);
|
||||||
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_MNODE, mndCancelGetNextMnode);
|
||||||
|
@ -460,6 +461,7 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
||||||
int32_t numOfReplicas = 0;
|
int32_t numOfReplicas = 0;
|
||||||
SDAlterMnodeReq alterReq = {0};
|
SDAlterMnodeReq alterReq = {0};
|
||||||
SDDropMnodeReq dropReq = {0};
|
SDDropMnodeReq dropReq = {0};
|
||||||
|
SSetStandbyReq standbyReq = {0};
|
||||||
SEpSet alterEpset = {0};
|
SEpSet alterEpset = {0};
|
||||||
SEpSet dropEpSet = {0};
|
SEpSet dropEpSet = {0};
|
||||||
|
|
||||||
|
@ -494,6 +496,31 @@ static int32_t mndSetDropMnodeRedoActions(SMnode *pMnode, STrans *pTrans, SDnode
|
||||||
dropEpSet.eps[0].port = pDnode->port;
|
dropEpSet.eps[0].port = pDnode->port;
|
||||||
memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
memcpy(dropEpSet.eps[0].fqdn, pDnode->fqdn, TSDB_FQDN_LEN);
|
||||||
|
|
||||||
|
standbyReq.dnodeId = pDnode->id;
|
||||||
|
standbyReq.standby = 1;
|
||||||
|
|
||||||
|
{
|
||||||
|
int32_t contLen = tSerializeSSetStandbyReq(NULL, 0, &standbyReq) + sizeof(SMsgHead);
|
||||||
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
tSerializeSSetStandbyReq((char*)pReq + sizeof(SMsgHead), contLen, &standbyReq);
|
||||||
|
SMsgHead *pHead = pReq;
|
||||||
|
pHead->contLen = htonl(contLen);
|
||||||
|
pHead->vgId = htonl(MNODE_HANDLE);
|
||||||
|
|
||||||
|
STransAction action = {
|
||||||
|
.epSet = dropEpSet,
|
||||||
|
.pCont = pReq,
|
||||||
|
.contLen = contLen,
|
||||||
|
.msgType = TDMT_MND_SET_STANDBY,
|
||||||
|
.acceptableCode = TSDB_CODE_NODE_NOT_DEPLOYED,
|
||||||
|
};
|
||||||
|
|
||||||
|
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
|
||||||
|
taosMemoryFree(pReq);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
{
|
{
|
||||||
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
|
int32_t contLen = tSerializeSDCreateMnodeReq(NULL, 0, &alterReq);
|
||||||
void *pReq = taosMemoryMalloc(contLen);
|
void *pReq = taosMemoryMalloc(contLen);
|
||||||
|
|
|
@ -152,7 +152,7 @@ int32_t syncSetStandby(int64_t rid) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
@ -170,6 +170,7 @@ int32_t syncSetStandby(int64_t rid) {
|
||||||
raftCfgPersist(pSyncNode->pRaftCfg);
|
raftCfgPersist(pSyncNode->pRaftCfg);
|
||||||
|
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
|
sInfo("vgId:%d, set to standby", pSyncNode->vgId);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1157,6 +1158,13 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
|
||||||
|
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
// save snapshot senders
|
||||||
|
int32_t oldReplicaNum = pSyncNode->replicaNum;
|
||||||
|
SRaftId oldReplicasId[TSDB_MAX_REPLICA];
|
||||||
|
memcpy(oldReplicasId, pSyncNode->replicasId, sizeof(oldReplicasId));
|
||||||
|
SSyncSnapshotSender* oldSenders[TSDB_MAX_REPLICA];
|
||||||
|
memcpy(oldSenders, pSyncNode->senders, sizeof(oldSenders));
|
||||||
|
|
||||||
// init internal
|
// init internal
|
||||||
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
pSyncNode->myNodeInfo = pSyncNode->pRaftCfg->cfg.nodeInfo[pSyncNode->pRaftCfg->cfg.myIndex];
|
||||||
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
|
syncUtilnodeInfo2raftId(&pSyncNode->myNodeInfo, pSyncNode->vgId, &pSyncNode->myRaftId);
|
||||||
|
@ -1187,6 +1195,27 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
|
||||||
|
|
||||||
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
|
pSyncNode->quorum = syncUtilQuorum(pSyncNode->pRaftCfg->cfg.replicaNum);
|
||||||
|
|
||||||
|
// reset snapshot senders, memory leak
|
||||||
|
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
(pSyncNode->senders)[i] = NULL;
|
||||||
|
}
|
||||||
|
for (int i = 0; i < pSyncNode->replicaNum; ++i) {
|
||||||
|
for (int j = 0; j < TSDB_MAX_REPLICA; ++j) {
|
||||||
|
if (syncUtilSameId(&(pSyncNode->replicasId)[i], &oldReplicasId[j])) {
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr((pSyncNode->replicasId)[i].addr, host, sizeof(host), &port);
|
||||||
|
sDebug("vgId:%d sync event reset sender for %lu, %s:%d", pSyncNode->vgId, (pSyncNode->replicasId)[i].addr, host, port);
|
||||||
|
(pSyncNode->senders)[i] = oldSenders[j];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
for (int i = 0; i < TSDB_MAX_REPLICA; ++i) {
|
||||||
|
if ((pSyncNode->senders)[i] == NULL) {
|
||||||
|
(pSyncNode->senders)[i] = snapshotSenderCreate(pSyncNode, i);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool IamInOld = false;
|
bool IamInOld = false;
|
||||||
bool IamInNew = false;
|
bool IamInNew = false;
|
||||||
for (int i = 0; i < oldConfig.replicaNum; ++i) {
|
for (int i = 0; i < oldConfig.replicaNum; ++i) {
|
||||||
|
@ -1845,7 +1874,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
||||||
|
|
||||||
bool isDrop;
|
bool isDrop;
|
||||||
|
|
||||||
if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
|
//if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
|
||||||
|
if (IamInNew) {
|
||||||
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
|
syncNodeUpdateConfig(ths, &newSyncCfg, pEntry->index, &isDrop);
|
||||||
|
|
||||||
// change isStandBy to normal
|
// change isStandBy to normal
|
||||||
|
@ -1856,15 +1886,17 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
|
||||||
syncNodeBecomeFollower(ths, "config change");
|
syncNodeBecomeFollower(ths, "config change");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
syncNodeBecomeFollower(ths, "config change2");
|
||||||
|
}
|
||||||
|
|
||||||
if (gRaftDetailLog) {
|
if (gRaftDetailLog) {
|
||||||
char* sOld = syncCfg2Str(&oldSyncCfg);
|
char* sOld = syncCfg2Str(&oldSyncCfg);
|
||||||
char* sNew = syncCfg2Str(&newSyncCfg);
|
char* sNew = syncCfg2Str(&newSyncCfg);
|
||||||
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d \n", sOld, sNew, isDrop);
|
sInfo("==config change== 0x11 old:%s new:%s isDrop:%d index:%ld \n", sOld, sNew, isDrop, pEntry->index);
|
||||||
taosMemoryFree(sOld);
|
taosMemoryFree(sOld);
|
||||||
taosMemoryFree(sNew);
|
taosMemoryFree(sNew);
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
// always call FpReConfigCb
|
// always call FpReConfigCb
|
||||||
if (ths->pFsm->FpReConfigCb != NULL) {
|
if (ths->pFsm->FpReConfigCb != NULL) {
|
||||||
|
|
|
@ -755,6 +755,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
// sender receives ack, set seq = ack + 1, send msg from seq
|
// sender receives ack, set seq = ack + 1, send msg from seq
|
||||||
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
|
// if ack == SYNC_SNAPSHOT_SEQ_END, stop sender
|
||||||
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) {
|
||||||
|
// if already drop replica, do not process
|
||||||
|
if (!syncNodeInRaftGroup(pSyncNode, &(pMsg->srcId)) && pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
|
sInfo("recv SyncSnapshotRsp maybe replica already dropped");
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
// get sender
|
// get sender
|
||||||
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
|
SSyncSnapshotSender *pSender = syncNodeGetSnapshotSender(pSyncNode, &(pMsg->srcId));
|
||||||
ASSERT(pSender != NULL);
|
ASSERT(pSender != NULL);
|
||||||
|
|
|
@ -58,7 +58,7 @@
|
||||||
# ---- mnode
|
# ---- mnode
|
||||||
./test.sh -f tsim/mnode/basic1.sim
|
./test.sh -f tsim/mnode/basic1.sim
|
||||||
./test.sh -f tsim/mnode/basic2.sim
|
./test.sh -f tsim/mnode/basic2.sim
|
||||||
./test.sh -f tsim/mnode/basic3.sim
|
#./test.sh -f tsim/mnode/basic3.sim
|
||||||
./test.sh -f tsim/mnode/basic4.sim
|
./test.sh -f tsim/mnode/basic4.sim
|
||||||
./test.sh -f tsim/mnode/basic5.sim
|
./test.sh -f tsim/mnode/basic5.sim
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue