From 889d1339e598d8dd4cae3e5bd7a9410c6563157d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 10 Mar 2022 16:18:16 +0800 Subject: [PATCH] sync refactor --- source/libs/sync/inc/syncRaftEntry.h | 1 + source/libs/sync/src/syncRaftEntry.c | 6 ++++ source/libs/sync/src/syncRaftLog.c | 34 ++++++++++++---------- source/libs/sync/test/syncLogStoreTest.cpp | 33 +++++++++++++-------- 4 files changed, 47 insertions(+), 27 deletions(-) diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index b607849dc8..be25675db4 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -47,6 +47,7 @@ SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); void syncEntryPrint(const SSyncRaftEntry* pEntry); +void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 7774cbb0ce..959bf49ee7 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -108,4 +108,10 @@ void syncEntryPrint(const SSyncRaftEntry* pEntry) { char* s = syncEntry2Str(pEntry); sTrace("%s", s); free(s); +} + +void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) { + char* ss = syncEntry2Str(pEntry); + sTrace("%s | %s", s, ss); + free(ss); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d8a460629b..d177d3ac9b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -53,9 +53,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { char* serialized = syncEntrySerialize(pEntry, &len); assert(serialized != NULL); - walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); - walFsync(pWal, true); + int code; + code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); + assert(code == 0); + walFsync(pWal, true); free(serialized); } @@ -84,23 +86,20 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { // return index of last entry SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { - /* - SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); - SyncIndex lastIndex = pLastEntry->index; - free(pLastEntry); - */ SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - int64_t last = walGetLastVer(pWal); - SyncIndex lastIndex = last < 0 ? 0 : last; + SyncIndex lastIndex = walGetLastVer(pWal); return lastIndex; } // return term of last entry SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { + SyncTerm lastTerm = 0; SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); - SyncTerm lastTerm = pLastEntry->term; - free(pLastEntry); + if (pLastEntry != NULL) { + lastTerm = pLastEntry->term; + free(pLastEntry); + } return lastTerm; } @@ -121,8 +120,11 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex lastIndex = walGetLastVer(pWal); - SSyncRaftEntry* pEntry; - pEntry = logStoreGetEntry(pLogStore, lastIndex); + + SSyncRaftEntry* pEntry = NULL; + if (lastIndex > 0) { + pEntry = logStoreGetEntry(pLogStore, lastIndex); + } return pEntry; } @@ -143,7 +145,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON* pEntries = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "pEntries", pEntries); SyncIndex lastIndex = logStoreLastIndex(pLogStore); - for (SyncIndex i = 1; i <= lastIndex; ++i) { + for (SyncIndex i = 0; i <= lastIndex; ++i) { SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); syncEntryDestory(pEntry); @@ -164,6 +166,8 @@ char* logStore2Str(SSyncLogStore* pLogStore) { // for debug void logStorePrint(SSyncLogStore* pLogStore) { char* s = logStore2Str(pLogStore); - sTrace("%s", s); + // sTrace("%s", s); + fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s); + free(s); } \ No newline at end of file diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp index a859186bbe..a5eb748de6 100644 --- a/source/libs/sync/test/syncLogStoreTest.cpp +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -35,16 +35,19 @@ SSyncNode* syncNodeInit() { syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + int code = walInit(); + assert(code == 0); SWalCfg walCfg; memset(&walCfg, 0, sizeof(SWalCfg)); walCfg.vgId = syncInfo.vgId; walCfg.fsyncPeriod = 1000; walCfg.retentionPeriod = 1000; walCfg.rollPeriod = 1000; - walCfg.retentionSize = 100000; - walCfg.segSize = 100000; - walCfg.level = TAOS_WAL_WRITE; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; pWal = walOpen("./wal_test", &walCfg); + assert(pWal != NULL); syncInfo.pWal = pWal; @@ -80,8 +83,20 @@ SSyncNode* syncInitTest() { return syncNodeInit(); } void logStoreTest() { logStorePrint(pSyncNode->pLogStore); for (int i = 0; i < 5; ++i) { - SSyncRaftEntry* pEntry; + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100; + pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; + snprintf(pEntry->data, dataLen, "value%d", i); + + //syncEntryPrint2((char*)"write entry:", pEntry); pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); + syncEntryDestory(pEntry); } logStorePrint(pSyncNode->pLogStore); @@ -117,16 +132,10 @@ int main(int argc, char** argv) { pSyncNode = syncInitTest(); assert(pSyncNode != NULL); - syncNodePrint((char*)"syncLogStoreTest", pSyncNode); - - initRaftId(pSyncNode); - - //-------------------------------------------------------------- + //syncNodePrint((char*)"syncLogStoreTest", pSyncNode); + //initRaftId(pSyncNode); logStoreTest(); - //-------------------------------------------------------------- - // walClose(pWal); - return 0; }