From 8141c430bcb0896e8a8983c3eb4fe5accd22c23e Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 27 May 2022 15:09:17 +0800 Subject: [PATCH 1/6] enh(sync) snapshot sender, receiver --- source/libs/sync/inc/syncSnapshot.h | 25 +++++++++++++++++++++++-- source/libs/sync/src/syncSnapshot.c | 12 ++++++++++-- 2 files changed, 33 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index fd2119ce65..104da48616 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -26,8 +26,29 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" -int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); -int32_t restoreSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); +typedef struct SSyncSnapshotSender { + bool isStart; + int32_t progressIndex; + void * pCurrentBlock; + int32_t len; + SSnapshot *pSnapshot; +} SSyncSnapshotSender; + +int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); +int32_t snapshotSenderStop(SSyncSnapshotSender *pSender); +int32_t snapshotSend(SSyncSnapshotSender *pSender); + +typedef struct SSyncSnapshotReceiver { + bool isStart; + int32_t progressIndex; + void * pCurrentBlock; + int32_t len; + SSnapshot *pSnapshot; +} SSyncSnapshotReceiver; + +int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); +int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); +int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 42b2bd993b..5419adee2c 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -15,6 +15,14 @@ #include "syncSnapshot.h" -int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } +int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { return 0; } -int32_t restoreSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } \ No newline at end of file +int32_t snapshotSenderStop(SSyncSnapshotSender *pSender) { return 0; } + +int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; } + +int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { return 0; } + +int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { return 0; } + +int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } From b7be03898c5061c903d7b4117ffba6c0b71875ff Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 27 May 2022 15:35:26 +0800 Subject: [PATCH 2/6] enh(sync) snapshot sender, receiver --- source/libs/sync/inc/syncInt.h | 8 ++++++-- source/libs/sync/inc/syncSnapshot.h | 19 +++++++++++++------ source/libs/sync/src/syncAppendEntries.c | 4 ++-- source/libs/sync/src/syncSnapshot.c | 16 ++++++++++++---- .../libs/sync/test/syncConfigChangeTest.cpp | 3 ++- 5 files changed, 35 insertions(+), 15 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2e71745f61..4100aa0216 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -55,6 +55,8 @@ typedef struct SVotesRespond SVotesRespond; typedef struct SSyncIndexMgr SSyncIndexMgr; typedef struct SRaftCfg SRaftCfg; typedef struct SSyncRespMgr SSyncRespMgr; +typedef struct SSyncSnapshotSender SSyncSnapshotSender; +typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncNode { // init by SSyncInfo @@ -148,9 +150,11 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - bool restoreFinish; // sem_t restoreSem; - SSnapshot* pSnapshot; + bool restoreFinish; + SSnapshot* pSnapshot; + SSyncSnapshotSender* pSender; + SSyncSnapshotReceiver* pReceiver; } SSyncNode; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 104da48616..b3174a4b36 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "cJSON.h" #include "syncInt.h" #include "taosdef.h" @@ -32,11 +33,14 @@ typedef struct SSyncSnapshotSender { void * pCurrentBlock; int32_t len; SSnapshot *pSnapshot; + SSyncNode *pSyncNode; } SSyncSnapshotSender; -int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); -int32_t snapshotSenderStop(SSyncSnapshotSender *pSender); -int32_t snapshotSend(SSyncSnapshotSender *pSender); +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode); +void snapshotSenderDestroy(SSyncSnapshotSender *pSender); +int32_t snapshotSend(SSyncSnapshotSender *pSender); +cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool isStart; @@ -44,11 +48,14 @@ typedef struct SSyncSnapshotReceiver { void * pCurrentBlock; int32_t len; SSnapshot *pSnapshot; + SSyncNode *pSyncNode; } SSyncSnapshotReceiver; -int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); -int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode); +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); +int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); +cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 008bc00dbc..ca34e3ea77 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -365,7 +365,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } SReConfigCbMeta cbMeta = {0}; - bool isDrop; + bool isDrop; // I am in newConfig if (hit) { @@ -388,7 +388,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } // always call FpReConfigCb - if (ths->pFsm->FpReConfigCb != NULL) { + if (ths->pFsm->FpReConfigCb != NULL) { cbMeta.code = 0; cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 5419adee2c..ccb0e6071b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -15,14 +15,22 @@ #include "syncSnapshot.h" -int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { return 0; } +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode) { return NULL; } -int32_t snapshotSenderStop(SSyncSnapshotSender *pSender) { return 0; } +void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {} int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; } -int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { return 0; } +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { return NULL; } -int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { return 0; } +char *snapshotSender2Str(SSyncSnapshotSender *pSender) { return NULL; } + +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; } + +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {} int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } + +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; } + +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 7efc3f50c0..1755b7a8fd 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -78,7 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", + cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } SSyncFSM* createFsm() { From d45221c8630d9d5dca1c69d59b87b356d6813c37 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 27 May 2022 15:41:19 +0800 Subject: [PATCH 3/6] enh(sync) logStoreGetEntry -> pLogStore->getEntry --- source/libs/sync/src/syncAppendEntries.c | 6 +++--- source/libs/sync/src/syncReplication.c | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index ca34e3ea77..46be7597c7 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -107,7 +107,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { SyncTerm localPreLogTerm = 0; if (pMsg->prevLogIndex >= SYNC_INDEX_BEGIN && pMsg->prevLogIndex <= ths->pLogStore->getLastIndex(ths->pLogStore)) { - SSyncRaftEntry* pEntry = logStoreGetEntry(ths->pLogStore, pMsg->prevLogIndex); + SSyncRaftEntry* pEntry = ths->pLogStore->getEntry(ths->pLogStore, pMsg->prevLogIndex); assert(pEntry != NULL); localPreLogTerm = pEntry->term; syncEntryDestory(pEntry); @@ -175,7 +175,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { bool conflict = false; SyncIndex extraIndex = pMsg->prevLogIndex + 1; - SSyncRaftEntry* pExtraEntry = logStoreGetEntry(ths->pLogStore, extraIndex); + SSyncRaftEntry* pExtraEntry = ths->pLogStore->getEntry(ths->pLogStore, extraIndex); assert(pExtraEntry != NULL); SSyncRaftEntry* pAppendEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); @@ -197,7 +197,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { // notice! reverse roll back! for (SyncIndex index = delEnd; index >= delBegin; --index) { if (ths->pFsm->FpRollBackCb != NULL) { - SSyncRaftEntry* pRollBackEntry = logStoreGetEntry(ths->pLogStore, index); + SSyncRaftEntry* pRollBackEntry = ths->pLogStore->getEntry(ths->pLogStore, index); assert(pRollBackEntry != NULL); // if (pRollBackEntry->msgType != TDMT_VND_SYNC_NOOP) { diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 2fdb8a0e17..d17e64d936 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -75,7 +75,7 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) { // SyncIndex lastIndex = syncUtilMinIndex(pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore), nextIndex); SyncAppendEntries* pMsg = NULL; - SSyncRaftEntry* pEntry = logStoreGetEntry(pSyncNode->pLogStore, nextIndex); + SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, nextIndex); if (pEntry != NULL) { pMsg = syncAppendEntriesBuild(pEntry->bytes, pSyncNode->vgId); assert(pMsg != NULL); From 54717925496ab4354e56377820076264ab7734e7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 27 May 2022 16:13:02 +0800 Subject: [PATCH 4/6] refactor: make logStoreGetLastEntry static --- source/libs/sync/inc/syncRaftLog.h | 4 +++- source/libs/sync/src/syncRaftLog.c | 3 +++ 2 files changed, 6 insertions(+), 1 deletion(-) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 7db62e14d5..c3082bc177 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -41,12 +41,14 @@ SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); -SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); + cJSON* logStore2Json(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore); cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); char* logStoreSimple2Str(SSyncLogStore* pLogStore); +// SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); + // for debug void logStorePrint(SSyncLogStore* pLogStore); void logStorePrint2(char* s, SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 07a9397a58..763a287bd8 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -16,6 +16,9 @@ #include "syncRaftLog.h" #include "wal.h" +static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); + + SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); assert(pLogStore != NULL); From 5a754023f55ef87f65bd71543a5a0e06c9736efa Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 27 May 2022 16:36:44 +0800 Subject: [PATCH 5/6] refactor: make logStore static --- source/libs/sync/inc/syncRaftLog.h | 15 +++++++-------- source/libs/sync/src/syncRaftLog.c | 7 +++++++ 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index c3082bc177..0461c8f535 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -34,20 +34,19 @@ typedef struct SSyncLogStoreData { SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); void logStoreDestory(SSyncLogStore* pLogStore); -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); -SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); -int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); -SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); -SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); -int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); -SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); - cJSON* logStore2Json(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore); cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); char* logStoreSimple2Str(SSyncLogStore* pLogStore); // SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); +// SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); +// SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); +// SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); +// int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); +// int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); +// int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); +// SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); // for debug void logStorePrint(SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 763a287bd8..997129f8a1 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -17,6 +17,13 @@ #include "wal.h" static SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); +static SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); +static SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); +static SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); +static int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); +static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); +static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); +static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { From 3a1be0c58a9a2f3022bf0b609d64657659934010 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Fri, 27 May 2022 18:00:09 +0800 Subject: [PATCH 6/6] refactor: make logStore static --- source/libs/sync/inc/syncRaftLog.h | 12 ++++++------ source/libs/sync/src/syncRaftLog.c | 5 +++-- 2 files changed, 9 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 0461c8f535..df5cd3f36c 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -32,12 +32,12 @@ typedef struct SSyncLogStoreData { SWal* pWal; } SSyncLogStoreData; -SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); -void logStoreDestory(SSyncLogStore* pLogStore); -cJSON* logStore2Json(SSyncLogStore* pLogStore); -char* logStore2Str(SSyncLogStore* pLogStore); -cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); -char* logStoreSimple2Str(SSyncLogStore* pLogStore); +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); +void logStoreDestory(SSyncLogStore* pLogStore); +cJSON* logStore2Json(SSyncLogStore* pLogStore); +char* logStore2Str(SSyncLogStore* pLogStore); +cJSON* logStoreSimple2Json(SSyncLogStore* pLogStore); +char* logStoreSimple2Str(SSyncLogStore* pLogStore); // SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); // SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index 997129f8a1..58fa8c2e8f 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -25,7 +25,6 @@ static int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex from static int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); static SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); - SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { SSyncLogStore* pLogStore = taosMemoryMalloc(sizeof(SSyncLogStore)); assert(pLogStore != NULL); @@ -88,7 +87,9 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); - int32_t code = walReadWithHandle(pWalHandle, index); + ASSERT(pWalHandle != NULL); + + int32_t code = walReadWithHandle(pWalHandle, index); if (code != 0) { int32_t err = terrno; const char* errStr = tstrerror(err);