feat: redistribute vgroup
This commit is contained in:
parent
2ba3b7538c
commit
7aa087a48d
|
@ -410,10 +410,10 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908)
|
#define TSDB_CODE_SYN_INVALID_CHECKSUM TAOS_DEF_ERROR_CODE(0, 0x0908)
|
||||||
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909)
|
#define TSDB_CODE_SYN_INVALID_MSGLEN TAOS_DEF_ERROR_CODE(0, 0x0909)
|
||||||
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
|
#define TSDB_CODE_SYN_INVALID_MSGTYPE TAOS_DEF_ERROR_CODE(0, 0x090A)
|
||||||
|
#define TSDB_CODE_SYN_IS_LEADER TAOS_DEF_ERROR_CODE(0, 0x090B)
|
||||||
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x0910)
|
#define TSDB_CODE_SYN_NOT_LEADER TAOS_DEF_ERROR_CODE(0, 0x090C)
|
||||||
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x0911)
|
#define TSDB_CODE_SYN_ONE_REPLICA TAOS_DEF_ERROR_CODE(0, 0x090D)
|
||||||
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x0912)
|
#define TSDB_CODE_SYN_NOT_IN_NEW_CONFIG TAOS_DEF_ERROR_CODE(0, 0x090E)
|
||||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
|
|
|
@ -397,7 +397,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
|
|
||||||
char logBuf[512] = {0};
|
char logBuf[512] = {0};
|
||||||
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
||||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
snprintf(logBuf, sizeof(logBuf), "==mndProcessSyncMsg== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||||
static int64_t mndTick = 0;
|
static int64_t mndTick = 0;
|
||||||
if (++mndTick % 10 == 1) {
|
if (++mndTick % 10 == 1) {
|
||||||
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
mTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||||
|
|
|
@ -256,14 +256,6 @@ void mndSyncStart(SMnode *pMnode) {
|
||||||
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
syncSetMsgCb(pMgmt->sync, &pMnode->msgCb);
|
||||||
syncStart(pMgmt->sync);
|
syncStart(pMgmt->sync);
|
||||||
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
|
mDebug("mnode sync started, id:%" PRId64 " standby:%d", pMgmt->sync, pMgmt->standby);
|
||||||
|
|
||||||
/*
|
|
||||||
if (pMgmt->standby) {
|
|
||||||
syncStartStandBy(pMgmt->sync);
|
|
||||||
} else {
|
|
||||||
syncStart(pMgmt->sync);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void mndSyncStop(SMnode *pMnode) {}
|
void mndSyncStop(SMnode *pMnode) {}
|
||||||
|
|
|
@ -300,108 +300,6 @@ void vnodeUpdateMetaRsp(SVnode *pVnode, STableMetaRsp *pMetaRsp) {
|
||||||
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
pMetaRsp->precision = pVnode->config.tsdbCfg.precision;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|
||||||
int32_t ret = 0;
|
|
||||||
|
|
||||||
if (syncEnvIsStart()) {
|
|
||||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
|
||||||
assert(pSyncNode != NULL);
|
|
||||||
|
|
||||||
ESyncState state = syncGetMyRole(pVnode->sync);
|
|
||||||
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
|
|
||||||
|
|
||||||
SMsgHead *pHead = pMsg->pCont;
|
|
||||||
|
|
||||||
char logBuf[512] = {0};
|
|
||||||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
|
||||||
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
|
||||||
static int64_t vndTick = 0;
|
|
||||||
if (++vndTick % 10 == 1) {
|
|
||||||
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
|
||||||
}
|
|
||||||
syncRpcMsgLog2(logBuf, pMsg);
|
|
||||||
taosMemoryFree(syncNodeStr);
|
|
||||||
|
|
||||||
SRpcMsg *pRpcMsg = pMsg;
|
|
||||||
|
|
||||||
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
|
||||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
|
||||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
|
||||||
syncPingDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
|
||||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
|
||||||
syncPingReplyDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
|
||||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
|
||||||
syncRequestVoteDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
|
||||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
|
||||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
|
||||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
|
||||||
syncAppendEntriesDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
|
||||||
assert(pSyncMsg != NULL);
|
|
||||||
|
|
||||||
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
|
||||||
ret = syncSetStandby(pVnode->sync);
|
|
||||||
vInfo("vgId:%d, set standby result:0x%x rid:%" PRId64, pVnode->config.vgId, ret, pVnode->sync);
|
|
||||||
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
|
|
||||||
tmsgSendRsp(&rsp);
|
|
||||||
} else {
|
|
||||||
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
|
||||||
ret = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
|
||||||
} else {
|
|
||||||
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
|
||||||
ret = -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (ret != 0) {
|
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
static int32_t vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int32_t len, SRpcMsg *pRsp) {
|
||||||
SVCreateStbReq req = {0};
|
SVCreateStbReq req = {0};
|
||||||
SDecoder coder;
|
SDecoder coder;
|
||||||
|
|
|
@ -50,6 +50,33 @@ static inline void vnodePostBlockMsg(SVnode *pVnode, tmsg_t type) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t vnodeSetStandBy(SVnode *pVnode) {
|
||||||
|
vInfo("vgId:%d, start to set standby", TD_VID(pVnode));
|
||||||
|
|
||||||
|
if (syncSetStandby(pVnode->sync) == 0) {
|
||||||
|
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
|
||||||
|
return 0;
|
||||||
|
} else if (terrno != TSDB_CODE_SYN_IS_LEADER) {
|
||||||
|
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
||||||
|
vError("vgId:%d, failed to transfer leader since:%s", TD_VID(pVnode), terrstr());
|
||||||
|
return -1;
|
||||||
|
} else {
|
||||||
|
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (syncSetStandby(pVnode->sync) == 0) {
|
||||||
|
vInfo("vgId:%d, set standby success", TD_VID(pVnode));
|
||||||
|
return 0;
|
||||||
|
} else {
|
||||||
|
vError("vgId:%d, failed to set standby since %s", TD_VID(pVnode), terrstr());
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
SAlterVnodeReq req = {0};
|
SAlterVnodeReq req = {0};
|
||||||
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
|
if (tDeserializeSAlterVnodeReq((char *)pMsg->pCont + sizeof(SMsgHead), pMsg->contLen - sizeof(SMsgHead), &req) != 0) {
|
||||||
|
@ -74,14 +101,19 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
|
|
||||||
int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
|
int32_t code = syncPropose(pVnode->sync, &rpcMsg, false);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
vDebug("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
|
if (terrno != 0) code = terrno;
|
||||||
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
|
||||||
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
|
vInfo("vgId:%d, failed to propose reconfig msg since %s", TD_VID(pVnode), terrstr());
|
||||||
} else {
|
if (terrno == TSDB_CODE_SYN_IS_LEADER) {
|
||||||
vDebug("vgId:%d, transfer leader success, propose reconfig config again", TD_VID(pVnode));
|
if (syncLeaderTransfer(pVnode->sync) != 0) {
|
||||||
|
vError("vgId:%d, failed to transfer leader since %s", TD_VID(pVnode), terrstr());
|
||||||
|
} else {
|
||||||
|
vInfo("vgId:%d, transfer leader success", TD_VID(pVnode));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
terrno = code;
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -108,7 +140,6 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
|
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
vnodeAccumBlockMsg(pVnode, pMsg->msgType);
|
||||||
|
|
||||||
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
} else if (code == -1 && terrno == TSDB_CODE_SYN_NOT_LEADER) {
|
||||||
SEpSet newEpSet = {0};
|
SEpSet newEpSet = {0};
|
||||||
syncGetEpSet(pVnode->sync, &newEpSet);
|
syncGetEpSet(pVnode->sync, &newEpSet);
|
||||||
|
@ -170,6 +201,108 @@ void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
int32_t ret = 0;
|
||||||
|
|
||||||
|
if (syncEnvIsStart()) {
|
||||||
|
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||||
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
ESyncState state = syncGetMyRole(pVnode->sync);
|
||||||
|
SyncTerm currentTerm = syncGetMyTerm(pVnode->sync);
|
||||||
|
|
||||||
|
SMsgHead *pHead = pMsg->pCont;
|
||||||
|
|
||||||
|
char logBuf[512] = {0};
|
||||||
|
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||||
|
snprintf(logBuf, sizeof(logBuf), "==vnodeProcessSyncReq== msgType:%d, syncNode: %s", pMsg->msgType, syncNodeStr);
|
||||||
|
static int64_t vndTick = 0;
|
||||||
|
if (++vndTick % 10 == 1) {
|
||||||
|
vTrace("sync trace msg:%s, %s", TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||||
|
}
|
||||||
|
syncRpcMsgLog2(logBuf, pMsg);
|
||||||
|
taosMemoryFree(syncNodeStr);
|
||||||
|
|
||||||
|
SRpcMsg *pRpcMsg = pMsg;
|
||||||
|
|
||||||
|
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||||
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||||
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||||
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||||
|
syncPingDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||||
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncPingReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||||
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg);
|
||||||
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||||
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||||
|
syncRequestVoteDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||||
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||||
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||||
|
syncAppendEntriesDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
assert(pSyncMsg != NULL);
|
||||||
|
|
||||||
|
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
||||||
|
ret = vnodeSetStandBy(pVnode);
|
||||||
|
if (ret != 0 && terrno != 0) ret = terrno;
|
||||||
|
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
} else {
|
||||||
|
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
||||||
|
ret = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
syncNodeRelease(pSyncNode);
|
||||||
|
} else {
|
||||||
|
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
||||||
|
ret = -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (ret != 0) {
|
||||||
|
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
static int32_t vnodeSyncEqMsg(const SMsgCb *msgcb, SRpcMsg *pMsg) {
|
||||||
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
int32_t code = tmsgPutToQueue(msgcb, SYNC_QUEUE, pMsg);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
|
@ -258,17 +391,17 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
|
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
|
||||||
|
|
||||||
int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
|
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
|
||||||
|
|
||||||
int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
|
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
|
||||||
|
|
||||||
int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
|
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
|
||||||
|
|
||||||
int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
|
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
|
||||||
|
|
||||||
int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
|
static int32_t vnodeSnapshotDoWrite(struct SSyncFSM *pFsm, void *pWriter, void *pBuf, int32_t len) { return 0; }
|
||||||
|
|
||||||
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
|
@ -279,7 +412,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||||
pFsm->FpRestoreFinishCb = NULL;
|
pFsm->FpRestoreFinishCb = NULL;
|
||||||
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
pFsm->FpReConfigCb = vnodeSyncReconfig;
|
||||||
|
|
||||||
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
pFsm->FpSnapshotStartRead = vnodeSnapshotStartRead;
|
||||||
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
pFsm->FpSnapshotStopRead = vnodeSnapshotStopRead;
|
||||||
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
|
pFsm->FpSnapshotDoRead = vnodeSnapshotDoRead;
|
||||||
|
@ -292,7 +424,6 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
SSyncInfo syncInfo = {
|
SSyncInfo syncInfo = {
|
||||||
.isStandBy = false,
|
|
||||||
.snapshotEnable = false,
|
.snapshotEnable = false,
|
||||||
.vgId = pVnode->config.vgId,
|
.vgId = pVnode->config.vgId,
|
||||||
.isStandBy = pVnode->config.standby,
|
.isStandBy = pVnode->config.standby,
|
||||||
|
@ -321,13 +452,6 @@ int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
void vnodeSyncStart(SVnode *pVnode) {
|
void vnodeSyncStart(SVnode *pVnode) {
|
||||||
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
syncSetMsgCb(pVnode->sync, &pVnode->msgCb);
|
||||||
syncStart(pVnode->sync);
|
syncStart(pVnode->sync);
|
||||||
/*
|
|
||||||
if (pVnode->config.standby) {
|
|
||||||
syncStartStandBy(pVnode->sync);
|
|
||||||
} else {
|
|
||||||
syncStart(pVnode->sync);
|
|
||||||
}
|
|
||||||
*/
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
|
||||||
|
|
|
@ -156,7 +156,7 @@ int32_t syncSetStandby(int64_t rid) {
|
||||||
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
terrno = TSDB_CODE_SYN_IS_LEADER;
|
||||||
sError("failed to set standby since it is leader, rid:%" PRId64, rid);
|
sError("failed to set standby since it is leader, rid:%" PRId64, rid);
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
|
@ -412,7 +412,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_MISMATCHED_SIGNATURE, "Mismatched signature"
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_CHECKSUM, "Invalid msg checksum")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGLEN, "Invalid msg length")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INVALID_MSGTYPE, "Invalid msg type")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_IS_LEADER, "Sync is leader")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_LEADER, "Sync not leader")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_ONE_REPLICA, "Sync one replica")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NOT_IN_NEW_CONFIG, "Sync not in new config")
|
||||||
|
|
Loading…
Reference in New Issue