sync refactor

This commit is contained in:
Minghao Li 2022-03-09 20:04:39 +08:00
parent d083c79744
commit 30be9d3e31
2 changed files with 61 additions and 8 deletions

View File

@ -40,7 +40,7 @@ void logStoreDestory(SSyncLogStore* pLogStore);
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
// get one log entry, user need to free pEntry->pCont
int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry);
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
@ -57,6 +57,8 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);

View File

@ -44,25 +44,76 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
}
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {}
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
char* buf = malloc(pEntry->bytes);
syncEntrySerialize(pEntry, buf, pEntry->bytes);
walWrite(pWal, pEntry->index, pEntry->msgType, buf, pEntry->bytes);
walFsync(pWal, true);
free(buf);
}
// get one log entry, user need to free pEntry->pCont
int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry) {}
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SSyncRaftEntry* pEntry;
SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
walReadWithHandle(pWalHandle, index);
// need to hold, do not new every time!!
walCloseReadHandle(pWalHandle);
return pEntry;
}
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {}
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRollback(pWal, fromIndex);
}
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {}
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
SyncIndex lastIndex = pLastEntry->index;
free(pLastEntry);
return lastIndex;
}
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {}
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
SyncTerm lastTerm = pLastEntry->term;
free(pLastEntry);
return lastTerm;
}
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {}
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walCommit(pWal, index);
}
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {}
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
return pData->pSyncNode->commitIndex;
}
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
SyncIndex lastIndex = walGetLastVer(pWal);
SSyncRaftEntry* pEntry;
pEntry = logStoreGetEntry(pLogStore, lastIndex);
return pEntry;
}
cJSON* logStore2Json(SSyncLogStore* pLogStore) {}