From 1a7891776766e2b358dabb4823aa9724764f70c2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Mon, 14 Mar 2022 14:05:40 +0800 Subject: [PATCH] sync index --- source/libs/sync/inc/syncRaftLog.h | 3 ++ source/libs/sync/inc/syncUtil.h | 2 + source/libs/sync/src/syncMain.c | 54 +++++++++++----------- source/libs/sync/src/syncRaftLog.c | 2 +- source/libs/sync/src/syncReplication.c | 19 ++++++++ source/libs/sync/src/syncUtil.c | 10 ++++ source/libs/sync/test/syncLogStoreTest.cpp | 11 ++++- 7 files changed, 72 insertions(+), 29 deletions(-) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 59b5fa94db..d979e0df15 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -27,6 +27,9 @@ extern "C" { #include "syncRaftEntry.h" #include "taosdef.h" +#define SYNC_INDEX_BEGIN 0 +#define SYNC_INDEX_INVALID -1 + typedef struct SSyncLogStoreData { SSyncNode* pSyncNode; SWal* pWal; diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 2bbc1948dd..bc38acdfe6 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -52,6 +52,8 @@ const char* syncUtilState2String(ESyncState state); bool syncUtilCanPrint(char c); char* syncUtilprintBin(char* ptr, uint32_t len); char* syncUtilprintBin2(char* ptr, uint32_t len); +SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b); +SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 860dd96cdf..63fb4062e0 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -327,33 +327,6 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } -// for debug -------------- -void syncNodePrint(SSyncNode* pObj) { - char* serialized = syncNode2Str(pObj); - printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized); - fflush(NULL); - free(serialized); -} - -void syncNodePrint2(char* s, SSyncNode* pObj) { - char* serialized = syncNode2Str(pObj); - printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); - fflush(NULL); - free(serialized); -} - -void syncNodeLog(SSyncNode* pObj) { - char* serialized = syncNode2Str(pObj); - sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized); - free(serialized); -} - -void syncNodeLog2(char* s, SSyncNode* pObj) { - char* serialized = syncNode2Str(pObj); - sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); - free(serialized); -} - int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); @@ -491,6 +464,33 @@ int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { return 0; } +// for debug -------------- +void syncNodePrint(SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + printf("syncNodePrint | len:%lu | %s \n", strlen(serialized), serialized); + fflush(NULL); + free(serialized); +} + +void syncNodePrint2(char* s, SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + printf("syncNodePrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + free(serialized); +} + +void syncNodeLog(SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + sTrace("syncNodeLog | len:%lu | %s", strlen(serialized), serialized); + free(serialized); +} + +void syncNodeLog2(char* s, SSyncNode* pObj) { + char* serialized = syncNode2Str(pObj); + sTrace("syncNodeLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); + free(serialized); +} + // ------ local funciton --------- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 27c8a26154..6ebeba1991 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -130,7 +130,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); cJSON_AddStringToObject(pRoot, "pWal", u64buf); - snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastIndex(pLogStore)); + snprintf(u64buf, sizeof(u64buf), "%ld", logStoreLastIndex(pLogStore)); cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index dfbe5db0ed..d4c57630ad 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -14,7 +14,9 @@ */ #include "syncReplication.h" +#include "syncIndexMgr.h" #include "syncMessage.h" +#include "syncRaftEntry.h" // TLA+ Spec // AppendEntries(i, j) == @@ -42,7 +44,24 @@ // /\ UNCHANGED <> // int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { + assert(pSyncNode->state == TAOS_SYNC_STATE_LEADER); + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId* pDestId = &(pSyncNode->peersId[i]); + SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId); + SyncIndex preLogIndex = nextIndex - 1; + SyncTerm preLogTerm = 0; + if (preLogIndex >= 0) { + SSyncRaftEntry* pPreEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, preLogIndex); + preLogTerm = pPreEntry->term; + } else { + preLogTerm = 0; + } + // SyncTerm lastIndex = + // pSyncNode->pLogStore->getLastIndex < nextIndex ? pSyncNode->pLogStore->getLastIndex : nextIndex; + } + return ret; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b78971bf37..4e56a9670f 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -184,4 +184,14 @@ char* syncUtilprintBin2(char* ptr, uint32_t len) { p += n; } return s; +} + +SyncIndex syncUtilMinIndex(SyncIndex a, SyncIndex b) { + SyncIndex r = a < b ? a : b; + return r; +} + +SyncIndex syncUtilMaxIndex(SyncIndex a, SyncIndex b) { + SyncIndex r = a > b ? a : b; + return r; } \ No newline at end of file diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp index 602fdee8c2..1b05f76fa2 100644 --- a/source/libs/sync/test/syncLogStoreTest.cpp +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -81,7 +81,10 @@ SSyncNode* syncNodeInit() { SSyncNode* syncInitTest() { return syncNodeInit(); } void logStoreTest() { - logStorePrint(pSyncNode->pLogStore); + logStorePrint2((char*)"logStoreTest2", pSyncNode->pLogStore); + + assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_INVALID); + for (int i = 0; i < 5; ++i) { int32_t dataLen = 10; SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); @@ -97,6 +100,10 @@ void logStoreTest() { // syncEntryPrint2((char*)"write entry:", pEntry); pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); syncEntryDestory(pEntry); + + if (i == 0) { + assert(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) == SYNC_INDEX_BEGIN); + } } logStorePrint(pSyncNode->pLogStore); @@ -129,6 +136,8 @@ int main(int argc, char** argv) { ret = syncEnvStart(); assert(ret == 0); + taosRemoveDir("./wal_test"); + pSyncNode = syncInitTest(); assert(pSyncNode != NULL);