diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index d351dc50f4..d58ae83dd3 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -221,7 +221,6 @@ void syncNodeVoteForSelf(SSyncNode* pSyncNode); // snapshot -------------- bool syncNodeHasSnapshot(SSyncNode* pSyncNode); -bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index); SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode); SyncTerm syncNodeGetLastTerm(SSyncNode* pSyncNode); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ba3dd66550..a10b81d165 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -860,7 +860,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - ASSERT(code == 0); + if (code != 0) { + return -1; + } // pre commit code = syncNodePreCommit(ths, pAppendEntry); @@ -971,7 +973,9 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs ASSERT(pAppendEntry != NULL); code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry); - ASSERT(code == 0); + if (code != 0) { + return -1; + } // pre commit code = syncNodePreCommit(ths, pAppendEntry); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index f6768bf494..cc2b5a7706 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -1935,19 +1935,6 @@ bool syncNodeHasSnapshot(SSyncNode* pSyncNode) { return ret; } -#if 0 -bool syncNodeIsIndexInSnapshot(SSyncNode* pSyncNode, SyncIndex index) { - ASSERT(syncNodeHasSnapshot(pSyncNode)); - ASSERT(pSyncNode->pFsm->FpGetSnapshotInfo != NULL); - ASSERT(index >= SYNC_INDEX_BEGIN); - - SSnapshot snapshot; - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - bool b = (index <= snapshot.lastApplyIndex); - return b; -} -#endif - SyncIndex syncNodeGetLastIndex(SSyncNode* pSyncNode) { SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0}; if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { @@ -2004,21 +1991,6 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { return preIndex; } -/* -SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index) { - ASSERT(index >= SYNC_INDEX_BEGIN); - - SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); - if (index > syncStartIndex) { - syncNodeLog3("syncNodeGetPreIndex", pSyncNode); - ASSERT(0); - } - - SyncIndex preIndex = index - 1; - return preIndex; -} -*/ - SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { if (index < SYNC_INDEX_BEGIN) { return SYNC_TERM_INVALID; @@ -2056,112 +2028,6 @@ SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { return SYNC_TERM_INVALID; } -#if 0 -SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { - ASSERT(index >= SYNC_INDEX_BEGIN); - - SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); - if (index > syncStartIndex) { - syncNodeLog3("syncNodeGetPreTerm", pSyncNode); - ASSERT(0); - } - - if (index == SYNC_INDEX_BEGIN) { - return 0; - } - - SyncTerm preTerm = 0; - SyncIndex preIndex = index - 1; - SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, preIndex, &pPreEntry); - if (code == 0) { - ASSERT(pPreEntry != NULL); - preTerm = pPreEntry->term; - taosMemoryFree(pPreEntry); - return preTerm; - } else { - if (terrno == TSDB_CODE_WAL_LOG_NOT_EXIST) { - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; - if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - if (snapshot.lastApplyIndex == preIndex) { - return snapshot.lastApplyTerm; - } - } - } - } - - ASSERT(0); - return -1; -} -#endif - -#if 0 -SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index) { - ASSERT(index >= SYNC_INDEX_BEGIN); - - SyncIndex syncStartIndex = syncNodeSyncStartIndex(pSyncNode); - if (index > syncStartIndex) { - syncNodeLog3("syncNodeGetPreTerm", pSyncNode); - ASSERT(0); - } - - if (index == SYNC_INDEX_BEGIN) { - return 0; - } - - SyncTerm preTerm = 0; - if (syncNodeHasSnapshot(pSyncNode)) { - // has snapshot - SSnapshot snapshot = {.data = NULL, .lastApplyIndex = -1, .lastApplyTerm = 0, .lastConfigIndex = -1}; - if (pSyncNode->pFsm->FpGetSnapshotInfo != NULL) { - pSyncNode->pFsm->FpGetSnapshotInfo(pSyncNode->pFsm, &snapshot); - } - - if (index > snapshot.lastApplyIndex + 1) { - // should be log preTerm - SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry); - ASSERT(code == 0); - ASSERT(pPreEntry != NULL); - - preTerm = pPreEntry->term; - taosMemoryFree(pPreEntry); - - } else if (index == snapshot.lastApplyIndex + 1) { - preTerm = snapshot.lastApplyTerm; - - } else { - // maybe snapshot change - sError("sync get pre term, bad scene. index:%ld", index); - logStoreLog2("sync get pre term, bad scene", pSyncNode->pLogStore); - - SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry); - ASSERT(code == 0); - ASSERT(pPreEntry != NULL); - - preTerm = pPreEntry->term; - taosMemoryFree(pPreEntry); - } - - } else { - // no snapshot - ASSERT(index > SYNC_INDEX_BEGIN); - - SSyncRaftEntry* pPreEntry = NULL; - int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index - 1, &pPreEntry); - ASSERT(code == 0); - ASSERT(pPreEntry != NULL); - - preTerm = pPreEntry->term; - taosMemoryFree(pPreEntry); - } - - return preTerm; -} -#endif - // get pre index and term of "index" int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm) { *pPreIndex = syncNodeGetPreIndex(pSyncNode, index); @@ -2351,8 +2217,8 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths) { ASSERT(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { - // ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + int32_t code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + ASSERT(code == 0); syncNodeReplicate(ths); } @@ -2406,6 +2272,7 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { // int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) { int32_t ret = 0; + int32_t code = 0; syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore); @@ -2414,18 +2281,24 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI ASSERT(pEntry != NULL); if (ths->state == TAOS_SYNC_STATE_LEADER) { - // ths->pLogStore->appendEntry(ths->pLogStore, pEntry); - ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + // append entry + code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry); + if (code != 0) { + // del resp mgr, call FpCommitCb + ASSERT(0); + return -1; + } - // start replicate right now! - syncNodeReplicate(ths); + // if mulit replica, start replicate right now + if (ths->replicaNum > 1) { + syncNodeReplicate(ths); + } // pre commit SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; @@ -2439,8 +2312,10 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI } rpcFreeCont(rpcMsg.pCont); - // only myself, maybe commit - syncMaybeAdvanceCommitIndex(ths); + // if only myself, maybe commit right now + if (ths->replicaNum == 1) { + syncMaybeAdvanceCommitIndex(ths); + } } else { // pre commit @@ -2448,7 +2323,6 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI syncEntry2OriginalRpc(pEntry, &rpcMsg); if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pEntry->originalRpcType != TDMT_SYNC_NOOP) { if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { SFsmCbMeta cbMeta = {0}; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index a574c8cc1e..a026892629 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -168,6 +168,9 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) { return lastVer + 1; } +// if success, return last term +// if not log, return 0 +// if error, return SYNC_TERM_INVALID static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -176,15 +179,17 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore) { } else { SSyncRaftEntry* pLastEntry; int32_t code = raftLogGetLastEntry(pLogStore, &pLastEntry); - ASSERT(code == 0); - ASSERT(pLastEntry != NULL); - - SyncTerm lastTerm = pLastEntry->term; - taosMemoryFree(pLastEntry); - return lastTerm; + if (code == 0 && pLastEntry != NULL) { + SyncTerm lastTerm = pLastEntry->term; + taosMemoryFree(pLastEntry); + return lastTerm; + } else { + return SYNC_TERM_INVALID; + } } - return 0; + // can not be here! + return SYNC_TERM_INVALID; } static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { @@ -218,16 +223,21 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr ASSERT(0); } - walFsync(pWal, true); + // walFsync(pWal, true); - char eventLog[128]; - snprintf(eventLog, sizeof(eventLog), "write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index, - TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); - syncNodeEventLog(pData->pSyncNode, eventLog); + do { + char eventLog[128]; + snprintf(eventLog, sizeof(eventLog), "write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index, + TMSG_INFO(pEntry->msgType), pEntry->msgType, TMSG_INFO(pEntry->originalRpcType), pEntry->originalRpcType); + syncNodeEventLog(pData->pSyncNode, eventLog); + } while (0); return code; } +// entry found, return 0 +// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST +// other error, return -1 static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -238,6 +248,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); SWalReadHandle* pWalHandle = pData->pWalHandle; if (pWalHandle == NULL) { + terrno = TSDB_CODE_SYN_INTERNAL_ERROR; return -1; } @@ -309,6 +320,9 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn return code; } +// entry found, return 0 +// entry not found, return -1, terrno = TSDB_CODE_WAL_LOG_NOT_EXIST +// other error, return -1 static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; @@ -320,7 +334,8 @@ static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** pp return -1; } else { SyncIndex lastIndex = raftLogLastIndex(pLogStore); - int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); + ASSERT(lastIndex >= SYNC_INDEX_BEGIN); + int32_t code = raftLogGetEntry(pLogStore, lastIndex, ppLastEntry); return code; } @@ -356,7 +371,7 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { ASSERT(0); } - walFsync(pWal, true); + // walFsync(pWal, true); char eventLog[128]; snprintf(eventLog, sizeof(eventLog), "old write index:%ld, type:%s,%d, type2:%s,%d", pEntry->index, diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 95dec6cb83..d272e0175f 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -156,6 +156,10 @@ static bool syncNodeOnRequestVoteLogOK(SSyncNode* pSyncNode, SyncRequestVote* pM SyncTerm myLastTerm = syncNodeGetLastTerm(pSyncNode); SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode); + if (myLastTerm == SYNC_TERM_INVALID) { + return false; + } + if (pMsg->lastLogTerm > myLastTerm) { return true; }