Call wal and sync code in vnode
This commit is contained in:
parent
8ad7c2fd26
commit
d95e8e1f6c
|
@ -46,14 +46,14 @@ typedef struct {
|
||||||
} SNodeInfo;
|
} SNodeInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int selfIndex;
|
int32_t selfIndex;
|
||||||
int replica;
|
int32_t replica;
|
||||||
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
|
SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
|
||||||
} SSyncCluster;
|
} SSyncCluster;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t selfIndex;
|
int32_t selfIndex;
|
||||||
int replica;
|
int32_t replica;
|
||||||
SNodeInfo node[TSDB_MAX_REPLICA];
|
SNodeInfo node[TSDB_MAX_REPLICA];
|
||||||
ESyncRole role[TSDB_MAX_REPLICA];
|
ESyncRole role[TSDB_MAX_REPLICA];
|
||||||
} SNodesRole;
|
} SNodesRole;
|
||||||
|
@ -62,20 +62,20 @@ typedef struct SSyncFSM {
|
||||||
void* pData;
|
void* pData;
|
||||||
|
|
||||||
// apply committed log, bufs will be free by raft module
|
// apply committed log, bufs will be free by raft module
|
||||||
int (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
|
int32_t (*applyLog)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf, void* pData);
|
||||||
|
|
||||||
// cluster commit callback
|
// cluster commit callback
|
||||||
int (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
|
int32_t (*onClusterChanged)(struct SSyncFSM* fsm, const SSyncCluster* cluster, void* pData);
|
||||||
|
|
||||||
// fsm return snapshot in ppBuf, bufs will be free by raft module
|
// fsm return snapshot in ppBuf, bufs will be free by raft module
|
||||||
// TODO: getSnapshot SHOULD be async?
|
// TODO: getSnapshot SHOULD be async?
|
||||||
int (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int* objId, bool* isLast);
|
int32_t (*getSnapshot)(struct SSyncFSM* fsm, SSyncBuffer** ppBuf, int32_t* objId, bool* isLast);
|
||||||
|
|
||||||
// fsm apply snapshot with pBuf data
|
// fsm apply snapshot with pBuf data
|
||||||
int (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int objId, bool isLast);
|
int32_t (*applySnapshot)(struct SSyncFSM* fsm, SSyncBuffer* pBuf, int32_t objId, bool isLast);
|
||||||
|
|
||||||
// call when restore snapshot and log done
|
// call when restore snapshot and log done
|
||||||
int (*onRestoreDone)(struct SSyncFSM* fsm);
|
int32_t (*onRestoreDone)(struct SSyncFSM* fsm);
|
||||||
|
|
||||||
void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
|
void (*onRollback)(struct SSyncFSM* fsm, SyncIndex index, const SSyncBuffer* buf);
|
||||||
|
|
||||||
|
@ -118,9 +118,9 @@ typedef struct SSyncClusterConfig {
|
||||||
typedef struct SStateManager {
|
typedef struct SStateManager {
|
||||||
void* pData;
|
void* pData;
|
||||||
|
|
||||||
void (*saveServerState)(struct SStateManager* stateMng, const SSyncServerState* state);
|
int32_t (*saveServerState)(struct SStateManager* stateMng, SSyncServerState* state);
|
||||||
|
|
||||||
const SSyncServerState* (*readServerState)(struct SStateManager* stateMng);
|
int32_t (*readServerState)(struct SStateManager* stateMng, SSyncServerState* state);
|
||||||
|
|
||||||
// void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
|
// void (*saveCluster)(struct SStateManager* stateMng, const SSyncClusterConfig* cluster);
|
||||||
|
|
||||||
|
@ -148,9 +148,9 @@ void syncStop(const SSyncNode*);
|
||||||
|
|
||||||
int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
|
int32_t syncPropose(SSyncNode* syncNode, SSyncBuffer buffer, void* pData, bool isWeak);
|
||||||
|
|
||||||
//int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
// int32_t syncAddNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||||
|
|
||||||
//int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
// int32_t syncRemoveNode(SSyncNode syncNode, const SNodeInfo *pNode);
|
||||||
|
|
||||||
extern int32_t syncDebugFlag;
|
extern int32_t syncDebugFlag;
|
||||||
|
|
||||||
|
|
|
@ -44,41 +44,41 @@ typedef struct {
|
||||||
EWalType walLevel; // wal level
|
EWalType walLevel; // wal level
|
||||||
} SWalCfg;
|
} SWalCfg;
|
||||||
|
|
||||||
typedef void * twalh; // WAL HANDLE
|
struct SWal;
|
||||||
typedef int32_t FWalWrite(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
|
typedef struct SWal SWal; // WAL HANDLE
|
||||||
|
typedef int32_t (*FWalWrite)(void *ahandle, void *pHead, int32_t qtype, void *pMsg);
|
||||||
|
|
||||||
//module initialization
|
// module initialization
|
||||||
int32_t walInit();
|
int32_t walInit();
|
||||||
void walCleanUp();
|
void walCleanUp();
|
||||||
|
|
||||||
//handle open and ctl
|
// handle open and ctl
|
||||||
twalh walOpen(char *path, SWalCfg *pCfg);
|
SWal *walOpen(char *path, SWalCfg *pCfg);
|
||||||
int32_t walAlter(twalh, SWalCfg *pCfg);
|
int32_t walAlter(SWal *, SWalCfg *pCfg);
|
||||||
void walStop(twalh);
|
void walClose(SWal *);
|
||||||
void walClose(twalh);
|
|
||||||
|
|
||||||
//write
|
// write
|
||||||
//int64_t walWriteWithMsgType(twalh, int8_t msgType, void* body, int32_t bodyLen);
|
// int64_t walWriteWithMsgType(SWal*, int8_t msgType, void* body, int32_t bodyLen);
|
||||||
int64_t walWrite(twalh, void* body, int32_t bodyLen);
|
int64_t walWrite(SWal *, int64_t index, void *body, int32_t bodyLen);
|
||||||
int64_t walWriteBatch(twalh, void** bodies, int32_t* bodyLen, int32_t batchSize);
|
int64_t walWriteBatch(SWal *, void **bodies, int32_t *bodyLen, int32_t batchSize);
|
||||||
|
|
||||||
//apis for lifecycle management
|
// apis for lifecycle management
|
||||||
void walFsync(twalh, bool force);
|
void walFsync(SWal *, bool force);
|
||||||
int32_t walCommit(twalh, int64_t ver);
|
int32_t walCommit(SWal *, int64_t ver);
|
||||||
//truncate after
|
// truncate after
|
||||||
int32_t walRollback(twalh, int64_t ver);
|
int32_t walRollback(SWal *, int64_t ver);
|
||||||
//notify that previous log can be pruned safely
|
// notify that previous log can be pruned safely
|
||||||
int32_t walPrune(twalh, int64_t ver);
|
int32_t walPrune(SWal *, int64_t ver);
|
||||||
|
|
||||||
//read
|
// read
|
||||||
int32_t walRead(twalh, SWalHead **, int64_t ver);
|
int32_t walRead(SWal *, SWalHead **, int64_t ver);
|
||||||
int32_t walReadWithFp(twalh, FWalWrite writeFp, int64_t verStart, int readNum);
|
int32_t walReadWithFp(SWal *, FWalWrite writeFp, int64_t verStart, int32_t readNum);
|
||||||
|
|
||||||
//lifecycle check
|
// lifecycle check
|
||||||
int32_t walFirstVer(twalh);
|
int32_t walFirstVer(SWal *);
|
||||||
int32_t walPersistedVer(twalh);
|
int32_t walPersistedVer(SWal *);
|
||||||
int32_t walLastVer(twalh);
|
int32_t walLastVer(SWal *);
|
||||||
//int32_t walDataCorrupted(twalh);
|
// int32_t walDataCorrupted(SWal*);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,18 @@ int32_t walInit() { return 0; }
|
||||||
|
|
||||||
void walCleanUp() {}
|
void walCleanUp() {}
|
||||||
|
|
||||||
twalh walOpen(char *path, SWalCfg *pCfg) { return NULL; }
|
SWal *walOpen(char *path, SWalCfg *pCfg) { return NULL; }
|
||||||
|
|
||||||
int32_t walAlter(twalh pWal, SWalCfg *pCfg) { return 0; }
|
int32_t walAlter(SWal *pWal, SWalCfg *pCfg) { return 0; }
|
||||||
|
|
||||||
|
void walClose(SWal *pWal) {}
|
||||||
|
|
||||||
|
void walFsync(SWal *pWal, bool force) {}
|
||||||
|
|
||||||
|
int64_t walWrite(SWal *pWal, int64_t index, void *body, int32_t bodyLen) {}
|
||||||
|
|
||||||
|
int32_t walCommit(SWal *pWal, int64_t ver) { return 0; }
|
||||||
|
|
||||||
|
int32_t walRollback(SWal *pWal, int64_t ver) { return 0; }
|
||||||
|
|
||||||
|
int32_t walPrune(SWal *pWal, int64_t ver) { return 0; }
|
|
@ -23,8 +23,8 @@ extern "C" {
|
||||||
|
|
||||||
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
|
int32_t vnodeReadCfg(int32_t vgId, SVnodeCfg *pCfg);
|
||||||
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
|
int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg);
|
||||||
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState);
|
int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState);
|
||||||
int32_t vnodeWriteTerm(int32_t vgid, SSyncServerState *pState);
|
int32_t vnodeSaveState(int32_t vgid, SSyncServerState *pState);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,7 +79,7 @@ typedef struct {
|
||||||
SMeta *pMeta;
|
SMeta *pMeta;
|
||||||
STsdb *pTsdb;
|
STsdb *pTsdb;
|
||||||
STQ *pTQ;
|
STQ *pTQ;
|
||||||
twalh pWal;
|
SWal *pWal;
|
||||||
void *pQuery;
|
void *pQuery;
|
||||||
SSyncNode *pSync;
|
SSyncNode *pSync;
|
||||||
taos_queue pWriteQ; // write queue
|
taos_queue pWriteQ; // write queue
|
||||||
|
|
|
@ -296,7 +296,7 @@ int32_t vnodeWriteCfg(int32_t vgId, SVnodeCfg *pCfg) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
|
int32_t vnodeReadState(int32_t vgId, SSyncServerState *pState) {
|
||||||
int32_t ret = TSDB_CODE_VND_APP_ERROR;
|
int32_t ret = TSDB_CODE_VND_APP_ERROR;
|
||||||
int32_t len = 0;
|
int32_t len = 0;
|
||||||
int32_t maxLen = 100;
|
int32_t maxLen = 100;
|
||||||
|
@ -305,7 +305,7 @@ int32_t vnodeReadTerm(int32_t vgId, SSyncServerState *pState) {
|
||||||
FILE *fp = NULL;
|
FILE *fp = NULL;
|
||||||
|
|
||||||
char file[PATH_MAX + 30] = {0};
|
char file[PATH_MAX + 30] = {0};
|
||||||
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId);
|
sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId);
|
||||||
|
|
||||||
len = (int32_t)fread(content, 1, maxLen, fp);
|
len = (int32_t)fread(content, 1, maxLen, fp);
|
||||||
if (len <= 0) {
|
if (len <= 0) {
|
||||||
|
@ -343,9 +343,9 @@ PARSE_TERM_ERROR:
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t vnodeWriteTerm(int32_t vgId, SSyncServerState *pState) {
|
int32_t vnodeSaveState(int32_t vgId, SSyncServerState *pState) {
|
||||||
char file[PATH_MAX + 30] = {0};
|
char file[PATH_MAX + 30] = {0};
|
||||||
sprintf(file, "%s/vnode%d/term.json", tsVnodeDir, vgId);
|
sprintf(file, "%s/vnode%d/state.json", tsVnodeDir, vgId);
|
||||||
|
|
||||||
FILE *fp = fopen(file, "w");
|
FILE *fp = fopen(file, "w");
|
||||||
if (!fp) {
|
if (!fp) {
|
||||||
|
|
|
@ -130,7 +130,8 @@ static void vnodeDestroyVnode(SVnode *pVnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->pWal) {
|
if (pVnode->pWal) {
|
||||||
// todo
|
walClose(pVnode->pWal);
|
||||||
|
pVnode->pWal = NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (pVnode->allocator) {
|
if (pVnode->allocator) {
|
||||||
|
@ -166,6 +167,56 @@ static void vnodeCleanupVnode(SVnode *pVnode) {
|
||||||
vnodeRelease(pVnode);
|
vnodeRelease(pVnode);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeLogWrite(struct SSyncLogStore *logStore, SyncIndex index, SSyncBuffer *pBuf) {
|
||||||
|
SVnode *pVnode = logStore->pData; // vnode status can be checked here
|
||||||
|
return walWrite(pVnode->pWal, index, pBuf->data, (int32_t)pBuf->len);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeLogCommit(struct SSyncLogStore *logStore, SyncIndex index) {
|
||||||
|
SVnode *pVnode = logStore->pData; // vnode status can be checked here
|
||||||
|
return walCommit(pVnode->pWal, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeLogPrune(struct SSyncLogStore *logStore, SyncIndex index) {
|
||||||
|
SVnode *pVnode = logStore->pData; // vnode status can be checked here
|
||||||
|
return walPrune(pVnode->pWal, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeLogRollback(struct SSyncLogStore *logStore, SyncIndex index) {
|
||||||
|
SVnode *pVnode = logStore->pData; // vnode status can be checked here
|
||||||
|
return walRollback(pVnode->pWal, index);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeSaveServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
|
||||||
|
SVnode *pVnode = stateMng->pData;
|
||||||
|
return vnodeSaveState(pVnode->vgId, pState);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeReadServerState(struct SStateManager *stateMng, SSyncServerState *pState) {
|
||||||
|
SVnode *pVnode = stateMng->pData;
|
||||||
|
return vnodeSaveState(pVnode->vgId, pState);
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeApplyLog(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf, void *pData) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeOnClusterChanged(struct SSyncFSM *fsm, const SSyncCluster *cluster, void *pData) { return 0; }
|
||||||
|
|
||||||
|
static inline int32_t vnodeGetSnapshot(struct SSyncFSM *fsm, SSyncBuffer **ppBuf, int32_t *objId, bool *isLast) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeApplySnapshot(struct SSyncFSM *fsm, SSyncBuffer *pBuf, int32_t objId, bool isLast) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static inline int32_t vnodeOnRestoreDone(struct SSyncFSM *fsm) { return 0; }
|
||||||
|
|
||||||
|
static inline void vnodeOnRollback(struct SSyncFSM *fsm, SyncIndex index, const SSyncBuffer *buf) {}
|
||||||
|
|
||||||
|
static inline void vnodeOnRoleChanged(struct SSyncFSM *fsm, const SNodesRole *pRole) {}
|
||||||
|
|
||||||
static int32_t vnodeOpenVnode(int32_t vgId) {
|
static int32_t vnodeOpenVnode(int32_t vgId) {
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -193,7 +244,7 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
code = vnodeReadTerm(vgId, &pVnode->term);
|
code = vnodeSaveState(vgId, &pVnode->term);
|
||||||
if (code != TSDB_CODE_SUCCESS) {
|
if (code != TSDB_CODE_SUCCESS) {
|
||||||
vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
|
vError("vgId:%d, failed to read term file since %s", pVnode->vgId, tstrerror(code));
|
||||||
pVnode->cfg.dropped = 1;
|
pVnode->cfg.dropped = 1;
|
||||||
|
@ -220,25 +271,24 @@ static int32_t vnodeOpenVnode(int32_t vgId) {
|
||||||
// create sync node
|
// create sync node
|
||||||
SSyncInfo syncInfo = {0};
|
SSyncInfo syncInfo = {0};
|
||||||
syncInfo.vgId = vgId;
|
syncInfo.vgId = vgId;
|
||||||
syncInfo.snapshotIndex = 0; // todo, from tsdb
|
syncInfo.snapshotIndex = 0; // todo, from tsdb
|
||||||
memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster));
|
memcpy(&syncInfo.syncCfg, &pVnode->cfg.sync, sizeof(SSyncCluster));
|
||||||
syncInfo.fsm.pData = pVnode;
|
syncInfo.fsm.pData = pVnode;
|
||||||
syncInfo.fsm.applyLog = NULL;
|
syncInfo.fsm.applyLog = vnodeApplyLog;
|
||||||
syncInfo.fsm.onClusterChanged = NULL;
|
syncInfo.fsm.onClusterChanged = vnodeOnClusterChanged;
|
||||||
syncInfo.fsm.getSnapshot = NULL;
|
syncInfo.fsm.getSnapshot = vnodeGetSnapshot;
|
||||||
syncInfo.fsm.applySnapshot = NULL;
|
syncInfo.fsm.applySnapshot = vnodeApplySnapshot;
|
||||||
syncInfo.fsm.onRestoreDone = NULL;
|
syncInfo.fsm.onRestoreDone = vnodeOnRestoreDone;
|
||||||
syncInfo.fsm.onRollback = NULL;
|
syncInfo.fsm.onRollback = vnodeOnRollback;
|
||||||
|
syncInfo.fsm.onRoleChanged = vnodeOnRoleChanged;
|
||||||
syncInfo.logStore.pData = pVnode;
|
syncInfo.logStore.pData = pVnode;
|
||||||
syncInfo.logStore.logWrite = NULL;
|
syncInfo.logStore.logWrite = vnodeLogWrite;
|
||||||
syncInfo.logStore.logCommit = NULL;
|
syncInfo.logStore.logCommit = vnodeLogCommit;
|
||||||
syncInfo.logStore.logPrune = NULL;
|
syncInfo.logStore.logPrune = vnodeLogPrune;
|
||||||
syncInfo.logStore.logRollback = NULL;
|
syncInfo.logStore.logRollback = vnodeLogRollback;
|
||||||
syncInfo.stateManager.pData = pVnode;
|
syncInfo.stateManager.pData = pVnode;
|
||||||
syncInfo.stateManager.saveServerState = NULL;
|
syncInfo.stateManager.saveServerState = vnodeSaveServerState;
|
||||||
syncInfo.stateManager.readServerState = NULL;
|
syncInfo.stateManager.readServerState = vnodeReadServerState;
|
||||||
// syncInfo.stateManager.saveCluster = NULL;
|
|
||||||
// syncInfo.stateManager.readCluster = NULL;
|
|
||||||
|
|
||||||
pVnode->pSync = syncStart(&syncInfo);
|
pVnode->pSync = syncStart(&syncInfo);
|
||||||
if (pVnode->pSync == NULL) {
|
if (pVnode->pSync == NULL) {
|
||||||
|
|
Loading…
Reference in New Issue