sync refactor

This commit is contained in:
Minghao Li 2022-03-10 16:54:26 +08:00
parent 901b5976cc
commit 73d768d273
6 changed files with 67 additions and 52 deletions

View File

@ -83,7 +83,7 @@ typedef struct SyncPing {
char data[];
} SyncPing;
#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
//#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t))
SyncPing* syncPingBuild(uint32_t dataLen);
void syncPingDestroy(SyncPing* pMsg);

View File

@ -46,8 +46,12 @@ char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len);
SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len);
cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry);
char* syncEntry2Str(const SSyncRaftEntry* pEntry);
void syncEntryPrint(const SSyncRaftEntry* pEntry);
void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry);
// for debug
void syncEntryPrint(const SSyncRaftEntry* pEntry);
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry);
void syncEntryLog(const SSyncRaftEntry* pEntry);
void syncEntryLog2(char* s, const SSyncRaftEntry* pEntry);
#ifdef __cplusplus
}

View File

@ -32,39 +32,24 @@ typedef struct SSyncLogStoreData {
SWal* pWal;
} SSyncLogStoreData;
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore);
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
// get one log entry, user need to free pEntry->pCont
SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode);
void logStoreDestory(SSyncLogStore* pLogStore);
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index);
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex);
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore);
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore);
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index);
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore);
SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
cJSON* logStore2Json(SSyncLogStore* pLogStore);
char* logStore2Str(SSyncLogStore* pLogStore);
// for debug
void logStorePrint(SSyncLogStore* pLogStore);
void logStorePrint2(char* s, SSyncLogStore* pLogStore);
void logStoreLog(SSyncLogStore* pLogStore);
void logStoreLog2(char* s, SSyncLogStore* pLogStore);
#ifdef __cplusplus
}

View File

@ -150,7 +150,7 @@ SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_FIX_LEN + dataLen;
uint32_t bytes = sizeof(SyncPing) + dataLen;
SyncPing* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
@ -173,7 +173,7 @@ void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) {
void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen);
assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
}
void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
@ -272,7 +272,7 @@ void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLe
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen);
assert(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
}
void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) {

View File

@ -104,14 +104,29 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) {
return serialized;
}
// for debug -----------
void syncEntryPrint(const SSyncRaftEntry* pEntry) {
char* s = syncEntry2Str(pEntry);
sTrace("%s", s);
free(s);
char* serialized = syncEntry2Str(pEntry);
printf("syncEntryPrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) {
char* ss = syncEntry2Str(pEntry);
sTrace("%s | %s", s, ss);
free(ss);
char* serialized = syncEntry2Str(pEntry);
printf("syncEntryPrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void syncEntryLog(const SSyncRaftEntry* pEntry) {
char* serialized = syncEntry2Str(pEntry);
sTrace("syncEntryPrint | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void syncEntryLog2(char* s, const SSyncRaftEntry* pEntry) {
char* serialized = syncEntry2Str(pEntry);
sTrace("syncEntryPrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}

View File

@ -43,7 +43,6 @@ void logStoreDestory(SSyncLogStore* pLogStore) {
}
}
// append one log entry
int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
@ -61,7 +60,6 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
free(serialized);
}
// get one log entry, user need to free pEntry->pCont
SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
@ -77,14 +75,12 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
return pEntry;
}
// truncate log with index, entries after the given index (>=index) will be deleted
int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walRollback(pWal, fromIndex);
}
// return index of last entry
SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
@ -92,7 +88,6 @@ SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {
return lastIndex;
}
// return term of last entry
SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
SyncTerm lastTerm = 0;
SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore);
@ -103,14 +98,12 @@ SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {
return lastTerm;
}
// update log store commit index with "index"
int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
SSyncLogStoreData* pData = pLogStore->data;
SWal* pWal = pData->pWal;
walCommit(pWal, index);
}
// return commit index of log
SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {
SSyncLogStoreData* pData = pLogStore->data;
return pData->pSyncNode->commitIndex;
@ -163,11 +156,29 @@ char* logStore2Str(SSyncLogStore* pLogStore) {
return serialized;
}
// for debug
// for debug -----------------
void logStorePrint(SSyncLogStore* pLogStore) {
char* s = logStore2Str(pLogStore);
// sTrace("%s", s);
fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s);
char* serialized = logStore2Str(pLogStore);
printf("logStorePrint | len:%lu | %s \n", strlen(serialized), serialized);
fflush(NULL);
free(serialized);
}
free(s);
void logStorePrint2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
printf("logStorePrint | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
fflush(NULL);
free(serialized);
}
void logStoreLog(SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTrace("logStorePrint | len:%lu | %s", strlen(serialized), serialized);
free(serialized);
}
void logStoreLog2(char* s, SSyncLogStore* pLogStore) {
char* serialized = logStore2Str(pLogStore);
sTrace("logStorePrint | len:%lu | %s | %s", strlen(serialized), s, serialized);
free(serialized);
}