enh(sync): add SSnapshot callback: reader, writer
This commit is contained in:
parent
3327a0692f
commit
cd2e8fe730
|
@ -66,12 +66,6 @@ typedef struct SSyncCfg {
|
||||||
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
|
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
|
||||||
} SSyncCfg;
|
} SSyncCfg;
|
||||||
|
|
||||||
typedef struct SSnapshot {
|
|
||||||
void* data;
|
|
||||||
SyncIndex lastApplyIndex;
|
|
||||||
SyncTerm lastApplyTerm;
|
|
||||||
} SSnapshot;
|
|
||||||
|
|
||||||
typedef struct SFsmCbMeta {
|
typedef struct SFsmCbMeta {
|
||||||
SyncIndex index;
|
SyncIndex index;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
|
@ -93,6 +87,12 @@ typedef struct SReConfigCbMeta {
|
||||||
uint64_t flag;
|
uint64_t flag;
|
||||||
} SReConfigCbMeta;
|
} SReConfigCbMeta;
|
||||||
|
|
||||||
|
typedef struct SSnapshot {
|
||||||
|
void *data;
|
||||||
|
SyncIndex lastApplyIndex;
|
||||||
|
SyncTerm lastApplyTerm;
|
||||||
|
} SSnapshot;
|
||||||
|
|
||||||
typedef struct SSyncFSM {
|
typedef struct SSyncFSM {
|
||||||
void* data;
|
void* data;
|
||||||
|
|
||||||
|
@ -101,23 +101,17 @@ typedef struct SSyncFSM {
|
||||||
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
|
|
||||||
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
|
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
|
||||||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
|
||||||
|
|
||||||
// if (*ppIter == NULL)
|
|
||||||
// *ppIter = new iter;
|
|
||||||
// else
|
|
||||||
// *ppIter.next();
|
|
||||||
//
|
|
||||||
// if success, return 0. else return error code
|
|
||||||
int32_t (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf,
|
|
||||||
int32_t* len);
|
|
||||||
|
|
||||||
// apply data into fsm
|
|
||||||
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len);
|
|
||||||
|
|
||||||
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
|
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);
|
||||||
|
|
||||||
// int32_t (*FpRestoreSnapshot)(struct SSyncFSM* pFsm, const SSnapshot* snapshot);
|
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||||
|
|
||||||
|
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
|
||||||
|
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
|
||||||
|
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
|
||||||
|
|
||||||
|
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void** ppWriter);
|
||||||
|
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
|
||||||
|
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
|
||||||
|
|
||||||
} SSyncFSM;
|
} SSyncFSM;
|
||||||
|
|
||||||
|
|
|
@ -105,6 +105,8 @@ void mndReConfig(struct SSyncFSM *pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
|
||||||
|
|
||||||
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
|
memset(pFsm, 0, sizeof(*pFsm));
|
||||||
|
|
||||||
pFsm->data = pMnode;
|
pFsm->data = pMnode;
|
||||||
|
|
||||||
pFsm->FpCommitCb = mndSyncCommitMsg;
|
pFsm->FpCommitCb = mndSyncCommitMsg;
|
||||||
|
@ -113,8 +115,6 @@ SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {
|
||||||
|
|
||||||
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
pFsm->FpGetSnapshot = mndSyncGetSnapshot;
|
||||||
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
pFsm->FpRestoreFinishCb = mndRestoreFinish;
|
||||||
pFsm->FpSnapshotRead = mndSnapshotRead;
|
|
||||||
pFsm->FpSnapshotApply = mndSnapshotApply;
|
|
||||||
pFsm->FpReConfigCb = mndReConfig;
|
pFsm->FpReConfigCb = mndReConfig;
|
||||||
|
|
||||||
return pFsm;
|
return pFsm;
|
||||||
|
|
|
@ -142,14 +142,13 @@ void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta
|
||||||
|
|
||||||
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
SSyncFSM *pFsm = taosMemoryCalloc(1, sizeof(SSyncFSM));
|
||||||
|
memset(pFsm, 0, sizeof(*pFsm));
|
||||||
pFsm->data = pVnode;
|
pFsm->data = pVnode;
|
||||||
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
pFsm->FpCommitCb = vnodeSyncCommitMsg;
|
||||||
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
pFsm->FpPreCommitCb = vnodeSyncPreCommitMsg;
|
||||||
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
pFsm->FpRollBackCb = vnodeSyncRollBackMsg;
|
||||||
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
pFsm->FpGetSnapshot = vnodeSyncGetSnapshot;
|
||||||
pFsm->FpRestoreFinishCb = NULL;
|
pFsm->FpRestoreFinishCb = NULL;
|
||||||
pFsm->FpSnapshotRead = NULL;
|
|
||||||
pFsm->FpSnapshotApply = NULL;
|
|
||||||
pFsm->FpReConfigCb = NULL;
|
pFsm->FpReConfigCb = NULL;
|
||||||
|
|
||||||
return pFsm;
|
return pFsm;
|
||||||
|
|
|
@ -28,10 +28,12 @@ extern "C" {
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
|
||||||
typedef struct SSyncSnapshotSender {
|
typedef struct SSyncSnapshotSender {
|
||||||
bool isStart;
|
int32_t sending;
|
||||||
int32_t progressIndex;
|
int32_t received;
|
||||||
|
bool finish;
|
||||||
void * pCurrentBlock;
|
void * pCurrentBlock;
|
||||||
int32_t len;
|
int32_t blockLen;
|
||||||
|
int64_t sendingMS;
|
||||||
SSnapshot *pSnapshot;
|
SSnapshot *pSnapshot;
|
||||||
SSyncNode *pSyncNode;
|
SSyncNode *pSyncNode;
|
||||||
} SSyncSnapshotSender;
|
} SSyncSnapshotSender;
|
||||||
|
@ -43,7 +45,8 @@ cJSON * snapshotSender2Json(SSyncSnapshotSender *pSender);
|
||||||
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
|
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||||
|
|
||||||
typedef struct SSyncSnapshotReceiver {
|
typedef struct SSyncSnapshotReceiver {
|
||||||
bool isStart;
|
bool start;
|
||||||
|
int32_t received;
|
||||||
int32_t progressIndex;
|
int32_t progressIndex;
|
||||||
void * pCurrentBlock;
|
void * pCurrentBlock;
|
||||||
int32_t len;
|
int32_t len;
|
||||||
|
|
|
@ -58,8 +58,8 @@ int32_t raftCfgPersist(SRaftCfg *pRaftCfg) {
|
||||||
int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf));
|
int64_t ret = taosWriteFile(pRaftCfg->pFile, buf, sizeof(buf));
|
||||||
assert(ret == sizeof(buf));
|
assert(ret == sizeof(buf));
|
||||||
|
|
||||||
//int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
|
// int64_t ret = taosWriteFile(pRaftCfg->pFile, s, strlen(s) + 1);
|
||||||
//assert(ret == strlen(s) + 1);
|
// assert(ret == strlen(s) + 1);
|
||||||
|
|
||||||
taosMemoryFree(s);
|
taosMemoryFree(s);
|
||||||
taosFsyncFile(pRaftCfg->pFile);
|
taosFsyncFile(pRaftCfg->pFile);
|
||||||
|
@ -170,7 +170,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) {
|
||||||
SRaftCfg raftCfg;
|
SRaftCfg raftCfg;
|
||||||
raftCfg.cfg = *pCfg;
|
raftCfg.cfg = *pCfg;
|
||||||
raftCfg.isStandBy = isStandBy;
|
raftCfg.isStandBy = isStandBy;
|
||||||
char * s = raftCfg2Str(&raftCfg);
|
char *s = raftCfg2Str(&raftCfg);
|
||||||
|
|
||||||
char buf[CONFIG_FILE_LEN];
|
char buf[CONFIG_FILE_LEN];
|
||||||
memset(buf, 0, sizeof(buf));
|
memset(buf, 0, sizeof(buf));
|
||||||
|
@ -179,8 +179,8 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, int8_t isStandBy, const char *path) {
|
||||||
int64_t ret = taosWriteFile(pFile, buf, sizeof(buf));
|
int64_t ret = taosWriteFile(pFile, buf, sizeof(buf));
|
||||||
assert(ret == sizeof(buf));
|
assert(ret == sizeof(buf));
|
||||||
|
|
||||||
//int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
|
// int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1);
|
||||||
//assert(ret == strlen(s) + 1);
|
// assert(ret == strlen(s) + 1);
|
||||||
|
|
||||||
taosMemoryFree(s);
|
taosMemoryFree(s);
|
||||||
taosCloseFile(&pFile);
|
taosCloseFile(&pFile);
|
||||||
|
|
|
@ -84,14 +84,15 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
|
||||||
|
|
||||||
SSyncFSM* createFsm() {
|
SSyncFSM* createFsm() {
|
||||||
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
SSyncFSM* pFsm = (SSyncFSM*)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||||
|
memset(pFsm, 0, sizeof(*pFsm));
|
||||||
|
|
||||||
pFsm->FpCommitCb = CommitCb;
|
pFsm->FpCommitCb = CommitCb;
|
||||||
pFsm->FpPreCommitCb = PreCommitCb;
|
pFsm->FpPreCommitCb = PreCommitCb;
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
|
|
||||||
pFsm->FpGetSnapshot = GetSnapshotCb;
|
pFsm->FpGetSnapshot = GetSnapshotCb;
|
||||||
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
pFsm->FpRestoreFinishCb = RestoreFinishCb;
|
||||||
pFsm->FpSnapshotApply = NULL;
|
|
||||||
pFsm->FpSnapshotRead = NULL;
|
|
||||||
|
|
||||||
pFsm->FpReConfigCb = ReConfigCb;
|
pFsm->FpReConfigCb = ReConfigCb;
|
||||||
|
|
||||||
|
|
|
@ -75,6 +75,7 @@ int32_t GetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot) {
|
||||||
|
|
||||||
void initFsm() {
|
void initFsm() {
|
||||||
pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
|
pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
|
||||||
|
memset(pFsm, 0, sizeof(*pFsm));
|
||||||
pFsm->FpCommitCb = CommitCb;
|
pFsm->FpCommitCb = CommitCb;
|
||||||
pFsm->FpPreCommitCb = PreCommitCb;
|
pFsm->FpPreCommitCb = PreCommitCb;
|
||||||
pFsm->FpRollBackCb = RollBackCb;
|
pFsm->FpRollBackCb = RollBackCb;
|
||||||
|
|
Loading…
Reference in New Issue