Merge branch 'feature/sync-refactor' of https://github.com/taosdata/TDengine into feature/sync-refactor
This commit is contained in:
commit
0beb323653
|
@ -826,7 +826,13 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms) {
|
||||||
|
|
||||||
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
int32_t electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
int32_t electMS;
|
||||||
|
|
||||||
|
if (pSyncNode->pRaftCfg->isStandBy) {
|
||||||
|
electMS = TIMER_MAX_MS;
|
||||||
|
} else {
|
||||||
|
electMS = syncUtilElectRandomMS(pSyncNode->electBaseLine, 2 * pSyncNode->electBaseLine);
|
||||||
|
}
|
||||||
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
|
ret = syncNodeRestartElectTimer(pSyncNode, electMS);
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
|
@ -184,8 +184,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
|
||||||
const char* linuxErrMsg = strerror(errno);
|
const char* linuxErrMsg = strerror(errno);
|
||||||
sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
|
sError("raftLogGetEntry error, err:%d %X, msg:%s, linuxErr:%d, linuxErrMsg:%s", err, err, errStr, linuxErr,
|
||||||
linuxErrMsg);
|
linuxErrMsg);
|
||||||
walCloseReadHandle(pWalHandle);
|
|
||||||
ASSERT(0);
|
ASSERT(0);
|
||||||
|
walCloseReadHandle(pWalHandle);
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -230,7 +230,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||||
uint16_t port;
|
uint16_t port;
|
||||||
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
|
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
|
||||||
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
|
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
|
sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu send msg:%s",
|
||||||
|
host, port, pSender->seq, pSender->ack, pSender->snapshot.lastApplyIndex, pSender->snapshot.lastApplyTerm,
|
||||||
msgStr);
|
msgStr);
|
||||||
} else {
|
} else {
|
||||||
sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d send msg:%s", host, port, pSender->seq,
|
sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d send msg:%s", host, port, pSender->seq,
|
||||||
|
@ -471,8 +472,11 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("sync event snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
char host[128];
|
||||||
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
sTrace("sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
|
||||||
|
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
|
@ -486,10 +490,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
|
char *logSimpleStr = logStoreSimple2Str(pSyncNode->pLogStore);
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
sInfo(
|
sInfo(
|
||||||
"sync event snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
|
"sync event snapshot recv from %s:%d finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
|
||||||
"snapshot.lastApplyTerm:%lu, raft log:%s",
|
"snapshot.lastApplyTerm:%lu, raft log:%s",
|
||||||
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
|
host, port, pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
|
||||||
taosMemoryFree(logSimpleStr);
|
taosMemoryFree(logSimpleStr);
|
||||||
|
|
||||||
pReceiver->pWriter = NULL;
|
pReceiver->pWriter = NULL;
|
||||||
|
@ -498,8 +505,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("sync event snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
sTrace("sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host, port,
|
||||||
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
||||||
|
@ -507,9 +514,13 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
snapshotReceiverStop(pReceiver, false);
|
snapshotReceiverStop(pReceiver, false);
|
||||||
needRsp = false;
|
needRsp = false;
|
||||||
|
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("sync event snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
sTrace("sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
|
||||||
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
|
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
|
@ -524,8 +535,11 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("sync event snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
char host[128];
|
||||||
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pMsg->srcId.addr, host, sizeof(host), &port);
|
||||||
|
sTrace("sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", host,
|
||||||
|
port, pReceiver->ack, pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
|
|
Loading…
Reference in New Issue