diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4fb3df3d9a..4ed64c43df 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -687,7 +687,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs code = ths->pLogStore->updateCommitIndex(ths->pLogStore, ths->commitIndex); ASSERT(code == 0); - code = syncNodeCommit(ths, beginIndex, endIndex, 0x11); + code = syncNodeCommit(ths, beginIndex, endIndex, ths->state); ASSERT(code == 0); } } diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 47e76dab4c..841ee15899 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -178,9 +178,14 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries if (pSender->start && pSender->term == ths->pRaftStore->currentTerm) { // already start sentryIndex = pSender->snapshot.lastApplyIndex; + sTrace("sending snapshot already start: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term, + ths->pRaftStore->currentTerm); } else { // start send snapshot, first time + sTrace("sending snapshot start first: pSender->term:%lu, ths->pRaftStore->currentTerm:%lu", pSender->term, + ths->pRaftStore->currentTerm); + snapshotSenderDoStart(pSender); pSender->start = true; sentryIndex = pSender->snapshot.lastApplyIndex; diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 74233ca4f3..f8d980d235 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -94,7 +94,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // execute fsm if (pSyncNode->pFsm != NULL) { - int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, 0x1); + int32_t code = syncNodeCommit(pSyncNode, beginIndex, endIndex, pSyncNode->state); ASSERT(code == 0); #if 0 diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index c343ccc3a1..c22b59212e 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1558,7 +1558,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { assert(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { - ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + // ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); syncNodeReplicate(ths); } @@ -1620,7 +1621,8 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { assert(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { - ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + // ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); // start replicate right now! syncNodeReplicate(ths); @@ -1692,8 +1694,9 @@ const char* syncStr(ESyncState state) { } int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { - int32_t code = 0; - sInfo("sync commit from %ld to %ld, flag:0x%lX", beginIndex, endIndex, flag); + int32_t code = 0; + ESyncState state = flag; + sInfo("sync event commit from %ld to %ld, %s", beginIndex, endIndex, syncUtilState2String(state)); // maybe execute by leader, skip snapshot SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a375152ae3..ce6734ba6b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -156,6 +156,9 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr } walFsync(pWal, true); + + sTrace("sync event write wal: %ld", pEntry->index); + return code; } @@ -309,6 +312,8 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { // assert(code == 0); walFsync(pWal, true); + + sTrace("sync event old write wal: %ld", pEntry->index); return code; }