Merge branch 'feature/sync-refactor' of https://github.com/taosdata/TDengine into feature/sync-refactor
This commit is contained in:
commit
71ff80101d
|
@ -497,30 +497,39 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
// prevLogIndex == -1
|
// prevLogIndex == -1
|
||||||
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
|
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
|
||||||
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
|
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
sTrace("syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld", pMsg->prevLogIndex);
|
sTrace("syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld", pMsg->prevLogIndex);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
|
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
if (pMsg->prevLogIndex > myLastIndex) {
|
if (pMsg->prevLogIndex > myLastIndex) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
sTrace("syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld", pMsg->prevLogIndex,
|
sTrace("syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld", pMsg->prevLogIndex,
|
||||||
myLastIndex);
|
myLastIndex);
|
||||||
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
|
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
|
||||||
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
|
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
sTrace(
|
sTrace(
|
||||||
"syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
|
"syncNodeOnAppendEntriesLogOK true, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
|
||||||
"myPreLogTerm:%lu",
|
"myPreLogTerm:%lu",
|
||||||
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
|
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
|
||||||
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (gRaftDetailLog) {
|
||||||
sTrace(
|
sTrace(
|
||||||
"syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
|
"syncNodeOnAppendEntriesLogOK false, pMsg->prevLogIndex:%ld, myLastIndex:%ld, pMsg->prevLogTerm:%lu, "
|
||||||
"myPreLogTerm:%lu",
|
"myPreLogTerm:%lu",
|
||||||
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
|
pMsg->prevLogIndex, myLastIndex, pMsg->prevLogTerm, myPreLogTerm);
|
||||||
|
}
|
||||||
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -119,7 +119,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
|
|
||||||
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
|
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
|
||||||
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
|
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
|
||||||
char * serialized = cJSON_Print(pJson);
|
char *serialized = cJSON_Print(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
@ -146,9 +146,11 @@ void syncIndexMgrLog(SSyncIndexMgr *pObj) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
|
void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char *serialized = syncIndexMgr2Str(pObj);
|
char *serialized = syncIndexMgr2Str(pObj);
|
||||||
sTrace("syncIndexMgrLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncIndexMgrLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
|
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
|
||||||
|
|
|
@ -852,10 +852,12 @@ int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRp
|
||||||
SEpSet epSet;
|
SEpSet epSet;
|
||||||
syncUtilraftId2EpSet(destRaftId, &epSet);
|
syncUtilraftId2EpSet(destRaftId, &epSet);
|
||||||
if (pSyncNode->FpSendMsg != NULL) {
|
if (pSyncNode->FpSendMsg != NULL) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* JsonStr = syncRpcMsg2Str(pMsg);
|
char* JsonStr = syncRpcMsg2Str(pMsg);
|
||||||
syncUtilJson2Line(JsonStr);
|
syncUtilJson2Line(JsonStr);
|
||||||
sTrace("sync send msg, vgId:%d, type:%d, msg:%s", pSyncNode->vgId, pMsg->msgType, JsonStr);
|
sTrace("sync send msg, vgId:%d, type:%d, msg:%s", pSyncNode->vgId, pMsg->msgType, JsonStr);
|
||||||
taosMemoryFree(JsonStr);
|
taosMemoryFree(JsonStr);
|
||||||
|
}
|
||||||
|
|
||||||
// htonl
|
// htonl
|
||||||
syncUtilMsgHtoN(pMsg->pCont);
|
syncUtilMsgHtoN(pMsg->pCont);
|
||||||
|
|
|
@ -145,9 +145,11 @@ void syncRpcMsgLog(SRpcMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) {
|
void syncRpcMsgLog2(char* s, SRpcMsg* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncRpcMsg2Str(pMsg);
|
char* serialized = syncRpcMsg2Str(pMsg);
|
||||||
sTrace("syncRpcMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncRpcMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncTimeout----
|
// ---- message process SyncTimeout----
|
||||||
|
@ -274,9 +276,11 @@ void syncTimeoutLog(const SyncTimeout* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
|
void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncTimeout2Str(pMsg);
|
char* serialized = syncTimeout2Str(pMsg);
|
||||||
sTrace("syncTimeoutLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncTimeoutLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncPing----
|
// ---- message process SyncPing----
|
||||||
|
@ -534,9 +538,11 @@ void syncPingLog(const SyncPing* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncPingLog2(char* s, const SyncPing* pMsg) {
|
void syncPingLog2(char* s, const SyncPing* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncPing2Str(pMsg);
|
char* serialized = syncPing2Str(pMsg);
|
||||||
sTrace("syncPingLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncPingLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncPingReply----
|
// ---- message process SyncPingReply----
|
||||||
|
@ -794,9 +800,11 @@ void syncPingReplyLog(const SyncPingReply* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) {
|
void syncPingReplyLog2(char* s, const SyncPingReply* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncPingReply2Str(pMsg);
|
char* serialized = syncPingReply2Str(pMsg);
|
||||||
sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncPingReplyLog2 | len:%zu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncClientRequest----
|
// ---- message process SyncClientRequest----
|
||||||
|
@ -935,9 +943,11 @@ void syncClientRequestLog(const SyncClientRequest* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
|
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncClientRequest2Str(pMsg);
|
char* serialized = syncClientRequest2Str(pMsg);
|
||||||
sTrace("syncClientRequestLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncClientRequestLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncRequestVote----
|
// ---- message process SyncRequestVote----
|
||||||
|
@ -1084,9 +1094,11 @@ void syncRequestVoteLog(const SyncRequestVote* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) {
|
void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncRequestVote2Str(pMsg);
|
char* serialized = syncRequestVote2Str(pMsg);
|
||||||
sTrace("syncRequestVoteLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncRequestVoteLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncRequestVoteReply----
|
// ---- message process SyncRequestVoteReply----
|
||||||
|
@ -1230,9 +1242,11 @@ void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) {
|
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncRequestVoteReply2Str(pMsg);
|
char* serialized = syncRequestVoteReply2Str(pMsg);
|
||||||
sTrace("syncRequestVoteReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncRequestVoteReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncAppendEntries----
|
// ---- message process SyncAppendEntries----
|
||||||
|
@ -1399,9 +1413,11 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
|
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncAppendEntries2Str(pMsg);
|
char* serialized = syncAppendEntries2Str(pMsg);
|
||||||
sTrace("syncAppendEntriesLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncAppendEntriesLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncAppendEntriesReply----
|
// ---- message process SyncAppendEntriesReply----
|
||||||
|
@ -1551,9 +1567,11 @@ void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
|
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
char* serialized = syncAppendEntriesReply2Str(pMsg);
|
||||||
sTrace("syncAppendEntriesReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncAppendEntriesReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---- message process SyncApplyMsg----
|
// ---- message process SyncApplyMsg----
|
||||||
|
@ -1702,9 +1720,11 @@ void syncApplyMsgLog(const SyncApplyMsg* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {
|
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncApplyMsg2Str(pMsg);
|
char* serialized = syncApplyMsg2Str(pMsg);
|
||||||
sTrace("syncApplyMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncApplyMsgLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
@ -1870,9 +1890,11 @@ void syncSnapshotSendLog(const SyncSnapshotSend* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg) {
|
void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncSnapshotSend2Str(pMsg);
|
char* serialized = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("syncSnapshotSendLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncSnapshotSendLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
@ -2028,7 +2050,9 @@ void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) {
|
void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = syncSnapshotRsp2Str(pMsg);
|
char* serialized = syncSnapshotRsp2Str(pMsg);
|
||||||
sTrace("syncSnapshotRspLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("syncSnapshotRspLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
|
@ -583,7 +583,9 @@ void logStoreSimpleLog(SSyncLogStore* pLogStore) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
|
void logStoreSimpleLog2(char* s, SSyncLogStore* pLogStore) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
char* serialized = logStoreSimple2Str(pLogStore);
|
char* serialized = logStoreSimple2Str(pLogStore);
|
||||||
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
sTrace("logStoreSimpleLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
taosMemoryFree(serialized);
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -106,7 +106,11 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
|
||||||
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot send begin seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
|
||||||
|
sTrace("sync event snapshot send to %s:%d begin seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
|
||||||
|
msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
syncSnapshotSendDestroy(pMsg);
|
syncSnapshotSendDestroy(pMsg);
|
||||||
|
@ -222,10 +226,15 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
|
||||||
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
|
||||||
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
|
if (pSender->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
sTrace("snapshot send finish seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
|
sTrace("sync event snapshot send to %s:%d finish seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
|
||||||
|
msgStr);
|
||||||
} else {
|
} else {
|
||||||
sTrace("snapshot send sending seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
|
sTrace("sync event snapshot send to %s:%d sending seq:%d ack:%d send msg:%s", host, port, pSender->seq,
|
||||||
|
pSender->ack, msgStr);
|
||||||
}
|
}
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
|
@ -250,7 +259,11 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
|
||||||
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
|
syncNodeSendMsgById(&(pMsg->destId), pSender->pSyncNode, &rpcMsg);
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot send resend seq:%d ack:%d send msg:%s", pSender->seq, pSender->ack, msgStr);
|
char host[128];
|
||||||
|
uint16_t port;
|
||||||
|
syncUtilU642Addr(pSender->pSyncNode->replicasId[pSender->replicaIndex].addr, host, sizeof(host), &port);
|
||||||
|
sTrace("sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s", host, port, pSender->seq, pSender->ack,
|
||||||
|
msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
syncSnapshotSendDestroy(pMsg);
|
syncSnapshotSendDestroy(pMsg);
|
||||||
|
@ -458,8 +471,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex,
|
sTrace("sync event snapshot recv begin ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
||||||
pMsg->lastTerm, msgStr);
|
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_END) {
|
||||||
|
@ -474,7 +487,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
pSyncNode->pFsm->FpGetSnapshot(pSyncNode->pFsm, &snapshot);
|
||||||
sInfo(
|
sInfo(
|
||||||
"snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
|
"sync event snapshot recv finish, update log begin index:%ld, snapshot.lastApplyIndex:%ld, "
|
||||||
"snapshot.lastApplyTerm:%lu, raft log:%s",
|
"snapshot.lastApplyTerm:%lu, raft log:%s",
|
||||||
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
|
pMsg->lastIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, logSimpleStr);
|
||||||
taosMemoryFree(logSimpleStr);
|
taosMemoryFree(logSimpleStr);
|
||||||
|
@ -485,8 +498,8 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack, pMsg->lastIndex,
|
sTrace("sync event snapshot recv end ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
||||||
pMsg->lastTerm, msgStr);
|
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
} else if (pMsg->seq == SYNC_SNAPSHOT_SEQ_FORCE_CLOSE) {
|
||||||
|
@ -495,7 +508,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = false;
|
needRsp = false;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
sTrace("sync event snapshot recv force close ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
||||||
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
|
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
@ -511,7 +524,7 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
|
||||||
needRsp = true;
|
needRsp = true;
|
||||||
|
|
||||||
char *msgStr = syncSnapshotSend2Str(pMsg);
|
char *msgStr = syncSnapshotSend2Str(pMsg);
|
||||||
sTrace("snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
sTrace("sync event snapshot recv receiving ack:%d, lastIndex:%ld, lastTerm:%lu, recv msg:%s", pReceiver->ack,
|
||||||
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
pMsg->lastIndex, pMsg->lastTerm, msgStr);
|
||||||
taosMemoryFree(msgStr);
|
taosMemoryFree(msgStr);
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue