enh(sync) snapshot sender, receiver

This commit is contained in:
Minghao Li 2022-05-27 15:35:26 +08:00
parent 8141c430bc
commit b7be03898c
5 changed files with 35 additions and 15 deletions

View File

@ -55,6 +55,8 @@ typedef struct SVotesRespond SVotesRespond;
typedef struct SSyncIndexMgr SSyncIndexMgr; typedef struct SSyncIndexMgr SSyncIndexMgr;
typedef struct SRaftCfg SRaftCfg; typedef struct SRaftCfg SRaftCfg;
typedef struct SSyncRespMgr SSyncRespMgr; typedef struct SSyncRespMgr SSyncRespMgr;
typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
typedef struct SSyncNode { typedef struct SSyncNode {
// init by SSyncInfo // init by SSyncInfo
@ -148,9 +150,11 @@ typedef struct SSyncNode {
SSyncRespMgr* pSyncRespMgr; SSyncRespMgr* pSyncRespMgr;
// restore state // restore state
bool restoreFinish;
// sem_t restoreSem; // sem_t restoreSem;
SSnapshot* pSnapshot; bool restoreFinish;
SSnapshot* pSnapshot;
SSyncSnapshotSender* pSender;
SSyncSnapshotReceiver* pReceiver;
} SSyncNode; } SSyncNode;

View File

@ -23,6 +23,7 @@ extern "C" {
#include <stdint.h> #include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include "cJSON.h"
#include "syncInt.h" #include "syncInt.h"
#include "taosdef.h" #include "taosdef.h"
@ -32,11 +33,14 @@ typedef struct SSyncSnapshotSender {
void * pCurrentBlock; void * pCurrentBlock;
int32_t len; int32_t len;
SSnapshot *pSnapshot; SSnapshot *pSnapshot;
SSyncNode *pSyncNode;
} SSyncSnapshotSender; } SSyncSnapshotSender;
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender); SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode);
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender); void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
int32_t snapshotSend(SSyncSnapshotSender *pSender); int32_t snapshotSend(SSyncSnapshotSender *pSender);
cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool isStart; bool isStart;
@ -44,11 +48,14 @@ typedef struct SSyncSnapshotReceiver {
void * pCurrentBlock; void * pCurrentBlock;
int32_t len; int32_t len;
SSnapshot *pSnapshot; SSnapshot *pSnapshot;
SSyncNode *pSyncNode;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver); SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode);
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver);
int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver); int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver);
cJSON * snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -365,7 +365,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} }
SReConfigCbMeta cbMeta = {0}; SReConfigCbMeta cbMeta = {0};
bool isDrop; bool isDrop;
// I am in newConfig // I am in newConfig
if (hit) { if (hit) {

View File

@ -15,14 +15,22 @@
#include "syncSnapshot.h" #include "syncSnapshot.h"
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender) { return 0; } SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode) { return NULL; }
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender) { return 0; } void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {}
int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; } int32_t snapshotSend(SSyncSnapshotSender *pSender) { return 0; }
int32_t snapshotReceiverStart(SSyncSnapshotReceiver *pReceiver) { return 0; } cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) { return NULL; }
int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver) { return 0; } char *snapshotSender2Str(SSyncSnapshotSender *pSender) { return NULL; }
SSyncSnapshotReceiver *snapshotReceiverCreate(SSyncNode *pSyncNode) { return NULL; }
void snapshotReceiverDestroy(SSyncSnapshotReceiver *pReceiver) {}
int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; } int32_t snapshotReceive(SSyncSnapshotReceiver *pReceiver) { return 0; }
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) { return NULL; }
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { return NULL; }

View File

@ -78,7 +78,8 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); } void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu", cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term); sTrace("==callback== ==ReConfigCb== flag:0x%lX, isDrop:%d, index:%ld, code:%d, currentTerm:%lu, term:%lu",
cbMeta.flag, cbMeta.isDrop, cbMeta.index, cbMeta.code, cbMeta.currentTerm, cbMeta.term);
} }
SSyncFSM* createFsm() { SSyncFSM* createFsm() {