From 30be9d3e314523d0a599dc88048e6a62a57fd610 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 20:04:39 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncRaftLog.h | 4 +- source/libs/sync/src/syncRaftLog.c | 65 ++++++++++++++++++++++++++---- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 634d06903c..ecdb544302 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -40,7 +40,7 @@ void logStoreDestory(SSyncLogStore* pLogStore); int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); // get one log entry, user need to free pEntry->pCont -int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry); +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); // truncate log with index, entries after the given index (>=index) will be deleted int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); @@ -57,6 +57,8 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); // return commit index of log SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); +SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); + cJSON* logStore2Json(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index c4e1feb374..d5735d9142 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -44,25 +44,76 @@ void logStoreDestory(SSyncLogStore* pLogStore) { } // append one log entry -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {} +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + char* buf = malloc(pEntry->bytes); + + syncEntrySerialize(pEntry, buf, pEntry->bytes); + walWrite(pWal, pEntry->index, pEntry->msgType, buf, pEntry->bytes); + walFsync(pWal, true); + + free(buf); +} // get one log entry, user need to free pEntry->pCont -int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry) {} +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SSyncRaftEntry* pEntry; + + SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + walReadWithHandle(pWalHandle, index); + + // need to hold, do not new every time!! + walCloseReadHandle(pWalHandle); + return pEntry; +} // truncate log with index, entries after the given index (>=index) will be deleted -int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {} +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + walRollback(pWal, fromIndex); +} // return index of last entry -SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {} +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { + SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); + SyncIndex lastIndex = pLastEntry->index; + free(pLastEntry); + return lastIndex; +} // return term of last entry -SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {} +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { + SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); + SyncTerm lastTerm = pLastEntry->term; + free(pLastEntry); + return lastTerm; +} // update log store commit index with "index" -int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {} +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + walCommit(pWal, index); +} // return commit index of log -SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {} +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + return pData->pSyncNode->commitIndex; +} + +SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SyncIndex lastIndex = walGetLastVer(pWal); + SSyncRaftEntry* pEntry; + pEntry = logStoreGetEntry(pLogStore, lastIndex); + return pEntry; +} cJSON* logStore2Json(SSyncLogStore* pLogStore) {}