Merge branch 'feature/sync-mnode-integration' into fix/mnode

This commit is contained in:
Shengliang Guan 2022-05-25 17:06:35 +08:00
commit 077ea1b658
6 changed files with 76 additions and 14 deletions

View File

@ -98,8 +98,18 @@ typedef struct SSyncFSM {
void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm);
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len);
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len); // if (*ppIter == NULL)
// *ppIter = new iter;
// else
// *ppIter.next();
//
// if success, return 0. else return error code
int32_t (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf,
int32_t* len);
// apply data into fsm
int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len);
void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta);

View File

@ -55,25 +55,33 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) {
} }
} }
void *mndSnapshotRead(struct SSyncFSM *pFsm, const SSnapshot *snapshot, void *iter, char **ppBuf, int32_t *len) { int32_t mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, int32_t* len) {
/*
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
SSdbIter *pIter = iter; SSdbIter *pIter;
if (iter == NULL) { if (iter == NULL) {
pIter = sdbIterInit(pMnode->pSdb); pIter = sdbIterInit(pMnode->sdb)
} else {
pIter = iter;
}
*/
return 0;
} }
return sdbIterRead(pMnode->pSdb, pIter, ppBuf, len); int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len) {
}
int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) {
SMnode *pMnode = pFsm->data; SMnode *pMnode = pFsm->data;
sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf);
return 0; return 0;
} }
void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) {
mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%ld, cbMeta.term:%ld, cbMeta.index:%ld", cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index);
if (cbMeta.code == 0) {
// config change success
} else {
// config change failed
}
} }
SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) { SSyncFSM *mndSyncMakeFsm(SMnode *pMnode) {

View File

@ -357,6 +357,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
} else { } else {
syncNodeBecomeFollower(ths); syncNodeBecomeFollower(ths);
} }
// maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig
if (ths->pFsm->FpReConfigCb != NULL) {
SReConfigCbMeta cbMeta = {0};
cbMeta.code = 0;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta);
}
} }
// restore finish // restore finish

View File

@ -134,6 +134,16 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
} else { } else {
syncNodeBecomeFollower(pSyncNode); syncNodeBecomeFollower(pSyncNode);
} }
// maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig
if (pSyncNode->pFsm->FpReConfigCb != NULL) {
SReConfigCbMeta cbMeta = {0};
cbMeta.code = 0;
cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm;
cbMeta.index = pEntry->index;
cbMeta.term = pEntry->term;
pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta);
}
} }
// restore finish // restore finish

View File

@ -349,7 +349,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
} }
// open/close -------------- // open/close --------------
SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo;
SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode));
assert(pSyncNode != NULL); assert(pSyncNode != NULL);
memset(pSyncNode, 0, sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode));
@ -361,11 +363,25 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr());
return NULL; return NULL;
} }
}
// create raft config file
snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path);
if (!taosCheckExistFile(pSyncNode->configPath)) {
// create raft config file
ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath); ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath);
assert(ret == 0); assert(ret == 0);
} else {
// update syncCfg by raft_config.json
pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath);
assert(pSyncNode->pRaftCfg != NULL);
pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg;
char* seralized = raftCfg2Str(pSyncNode->pRaftCfg);
sInfo("syncNodeOpen update config :%s", seralized);
taosMemoryFree(seralized);
raftCfgClose(pSyncNode->pRaftCfg);
} }
// init by SSyncInfo // init by SSyncInfo

View File

@ -49,7 +49,7 @@ void test4() {
logTest((char*)__FUNCTION__); logTest((char*)__FUNCTION__);
} }
int main() { int main(int argc, char** argv) {
// taosInitLog("tmp/syncTest.log", 100); // taosInitLog("tmp/syncTest.log", 100);
tsAsyncLog = 0; tsAsyncLog = 0;
@ -58,6 +58,14 @@ int main() {
test3(); test3();
test4(); test4();
if (argc == 2) {
bool bTaosDirExist = taosDirExist(argv[1]);
printf("%s bTaosDirExist:%d \n", argv[1], bTaosDirExist);
bool bTaosCheckExistFile = taosCheckExistFile(argv[1]);
printf("%s bTaosCheckExistFile:%d \n", argv[1], bTaosCheckExistFile);
}
// taosCloseLog(); // taosCloseLog();
return 0; return 0;
} }