diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 2e71745f61..4100aa0216 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -55,6 +55,8 @@ typedef struct SVotesRespond SVotesRespond; typedef struct SSyncIndexMgr SSyncIndexMgr; typedef struct SRaftCfg SRaftCfg; typedef struct SSyncRespMgr SSyncRespMgr; +typedef struct SSyncSnapshotSender SSyncSnapshotSender; +typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver; typedef struct SSyncNode { // init by SSyncInfo @@ -148,9 +150,11 @@ typedef struct SSyncNode { SSyncRespMgr* pSyncRespMgr; // restore state - bool restoreFinish; // sem_t restoreSem; - SSnapshot* pSnapshot; + bool restoreFinish; + SSnapshot* pSnapshot; + SSyncSnapshotSender* pSender; + SSyncSnapshotReceiver* pReceiver; } SSyncNode; diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 104da48616..b3174a4b36 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -23,6 +23,7 @@ extern "C" { #include #include #include +#include "cJSON.h" #include "syncInt.h" #include "taosdef.h" @@ -32,11 +33,14 @@ typedef struct SSyncSnapshotSender { void * pCurrentBlock; int32_t len; SSnapshot *pSnapshot; + SSyncNode *pSyncNode; } SSyncSnapshotSender; -int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); -int32_t snapshotSenderStop(SSyncSnapshotSender *pSender); -int32_t snapshotSend(SSyncSnapshotSender *pSender); +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode); +void snapshotSenderDestroy(SSyncSnapshotSender *pSender); +int32_t snapshotSend(SSyncSnapshotSender *pSender); +cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender); +char * snapshotSender2Str(SSyncSnapshotSender *pSender); typedef struct SSyncSnapshotReceiver { bool isStart; @@ -44,11 +48,14 @@ typedef struct SSyncSnapshotReceiver { void * pCurrentBlock; int32_t len; SSnapshot *pSnapshot; + SSyncNode *pSyncNode; } SSyncSnapshotReceiver; -int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); -int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); -int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode); +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver); +int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); +cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); +char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 008bc00dbc..ca34e3ea77 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -365,7 +365,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } SReConfigCbMeta cbMeta = {0}; - bool isDrop; + bool isDrop; // I am in newConfig if (hit) { @@ -388,7 +388,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } // always call FpReConfigCb - if (ths->pFsm->FpReConfigCb != NULL) { + if (ths->pFsm->FpReConfigCb != NULL) { cbMeta.code = 0; cbMeta.currentTerm = ths->pRaftStore->currentTerm; cbMeta.index = pEntry->index; diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 5419adee2c..ccb0e6071b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -15,14 +15,22 @@ #include "syncSnapshot.h" -int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { return 0; } +SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode) { return NULL; } -int32_t snapshotSenderStop(SSyncSnapshotSender *pSender) { return 0; } +void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {} int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; } -int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { return 0; } +cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { return NULL; } -int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { return 0; } +char *snapshotSender2Str(SSyncSnapshotSender *pSender) { return NULL; } + +SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; } + +void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {} int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } + +cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; } + +char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; } diff --git a/source/libs/sync/test/syncConfigChangeTest.cpp b/source/libs/sync/test/syncConfigChangeTest.cpp index 7efc3f50c0..1755b7a8fd 100644 --- a/source/libs/sync/test/syncConfigChangeTest.cpp +++ b/source/libs/sync/test/syncConfigChangeTest.cpp @@ -78,7 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { - sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); + sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", + cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); } SSyncFSM* createFsm() {