From 0a5412f33d7ae055c251584d38a027d6b1e82be2 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 24 May 2022 21:00:14 +0800 Subject: [PATCH 1/3] refactor: modify FpSnapshotRead, FpSnapshotApply --- include/libs/sync/sync.h | 13 +++++++++++-- source/dnode/mnode/impl/src/mndSync.c | 6 +++--- 2 files changed, 14 insertions(+), 5 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5ffcbb7a09..4ac4922e80 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -98,8 +98,17 @@ typedef struct SSyncFSM { void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - void* (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len); - int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len); + + // if (*ppIter == NULL) + // *ppIter = new iter; + // else + // *ppIter.next(); + // + // if success, return 0. else return error code + int32_t (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, int32_t* len); + + // apply data into fsm + int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len); void (*FpReConfigCb)(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 2b4b472c35..0ddb542b38 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -55,7 +55,7 @@ void mndRestoreFinish(struct SSyncFSM *pFsm) { pMnode->syncMgmt.restored = true; } -void* mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* iter, char** ppBuf, int32_t* len) { +int32_t mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, int32_t* len) { /* SMnode *pMnode = pFsm->data; SSdbIter *pIter; @@ -68,10 +68,10 @@ void* mndSnapshotRead(struct SSyncFSM* pFsm, const SSnapshot* snapshot, void* it return pIter; */ - return NULL; + return 0; } -int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* snapshot, char* pBuf, int32_t len) { +int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len) { SMnode *pMnode = pFsm->data; sdbWrite(pMnode->pSdb, (SSdbRaw*)pBuf); return 0; From 81d0798f5f801043ebb489f487b4761b119c5dad Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 25 May 2022 14:27:59 +0800 Subject: [PATCH 2/3] refactor: update config when there's one locally --- source/libs/sync/src/syncMain.c | 20 ++++++++++++++++++-- source/libs/sync/test/syncTest.cpp | 10 +++++++++- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4aca68fa64..d128ed48b4 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -349,7 +349,9 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { } // open/close -------------- -SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { +SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { + SSyncInfo* pSyncInfo = (SSyncInfo*)pOldSyncInfo; + SSyncNode* pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); memset(pSyncNode, 0, sizeof(SSyncNode)); @@ -361,11 +363,25 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { sError("failed to create dir:%s since %s", pSyncInfo->path, terrstr()); return NULL; } + } + snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); + if (!taosCheckExistFile(pSyncNode->configPath)) { // create raft config file - snprintf(pSyncNode->configPath, sizeof(pSyncNode->configPath), "%s/raft_config.json", pSyncInfo->path); ret = syncCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), pSyncNode->configPath); assert(ret == 0); + + } else { + // update syncCfg by raft_config.json + pSyncNode->pRaftCfg = raftCfgOpen(pSyncNode->configPath); + assert(pSyncNode->pRaftCfg != NULL); + pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; + + char *seralized = raftCfg2Str(pSyncNode->pRaftCfg); + sInfo("syncNodeOpen update config :%s", seralized); + taosMemoryFree(seralized); + + raftCfgClose(pSyncNode->pRaftCfg); } // init by SSyncInfo diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 76024e061e..5d205d8efe 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -49,7 +49,7 @@ void test4() { logTest((char*)__FUNCTION__); } -int main() { +int main(int argc, char **argv) { // taosInitLog("tmp/syncTest.log", 100); tsAsyncLog = 0; @@ -58,6 +58,14 @@ int main() { test3(); test4(); + if (argc == 2) { + bool bTaosDirExist = taosDirExist(argv[1]); + printf("%s bTaosDirExist:%d \n", argv[1], bTaosDirExist); + + bool bTaosCheckExistFile = taosCheckExistFile(argv[1]); + printf("%s bTaosCheckExistFile:%d \n", argv[1], bTaosCheckExistFile); + } + // taosCloseLog(); return 0; } From 104a20757957d077412180b528453b2dd64e7c8d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 25 May 2022 14:43:45 +0800 Subject: [PATCH 3/3] FpReConfigCb --- include/libs/sync/sync.h | 9 +++++---- source/dnode/mnode/impl/src/mndSync.c | 1 + source/libs/sync/src/syncAppendEntries.c | 10 ++++++++++ source/libs/sync/src/syncCommit.c | 10 ++++++++++ source/libs/sync/src/syncMain.c | 2 +- source/libs/sync/test/syncTest.cpp | 2 +- 6 files changed, 28 insertions(+), 6 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 4ac4922e80..b14b7667d2 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -99,14 +99,15 @@ typedef struct SSyncFSM { void (*FpRestoreFinishCb)(struct SSyncFSM* pFsm); int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); - // if (*ppIter == NULL) + // if (*ppIter == NULL) // *ppIter = new iter; - // else + // else // *ppIter.next(); // // if success, return 0. else return error code - int32_t (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, int32_t* len); - + int32_t (*FpSnapshotRead)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, void** ppIter, char** ppBuf, + int32_t* len); + // apply data into fsm int32_t (*FpSnapshotApply)(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char* pBuf, int32_t len); diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index f127991c46..50425761ba 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -74,6 +74,7 @@ int32_t mndSnapshotApply(struct SSyncFSM* pFsm, const SSnapshot* pSnapshot, char } void mndReConfig(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta) { + mInfo("mndReConfig cbMeta.code:%d, cbMeta.currentTerm:%ld, cbMeta.term:%ld, cbMeta.index:%ld", cbMeta.code, cbMeta.currentTerm, cbMeta.term, cbMeta.index); if (cbMeta.code == 0) { // config change success } else { diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 0411628c5c..c9e16c53c8 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -357,6 +357,16 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { } else { syncNodeBecomeFollower(ths); } + + // maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig + if (ths->pFsm->FpReConfigCb != NULL) { + SReConfigCbMeta cbMeta = {0}; + cbMeta.code = 0; + cbMeta.currentTerm = ths->pRaftStore->currentTerm; + cbMeta.index = pEntry->index; + cbMeta.term = pEntry->term; + ths->pFsm->FpReConfigCb(ths->pFsm, newSyncCfg, cbMeta); + } } // restore finish diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index 36713ceed5..a3d480956e 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -134,6 +134,16 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { } else { syncNodeBecomeFollower(pSyncNode); } + + // maybe newSyncCfg.myIndex is updated in syncNodeUpdateConfig + if (pSyncNode->pFsm->FpReConfigCb != NULL) { + SReConfigCbMeta cbMeta = {0}; + cbMeta.code = 0; + cbMeta.currentTerm = pSyncNode->pRaftStore->currentTerm; + cbMeta.index = pEntry->index; + cbMeta.term = pEntry->term; + pSyncNode->pFsm->FpReConfigCb(pSyncNode->pFsm, newSyncCfg, cbMeta); + } } // restore finish diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d128ed48b4..e4b6fc215f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -377,7 +377,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) { assert(pSyncNode->pRaftCfg != NULL); pSyncInfo->syncCfg = pSyncNode->pRaftCfg->cfg; - char *seralized = raftCfg2Str(pSyncNode->pRaftCfg); + char* seralized = raftCfg2Str(pSyncNode->pRaftCfg); sInfo("syncNodeOpen update config :%s", seralized); taosMemoryFree(seralized); diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 5d205d8efe..ffe8b81571 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -49,7 +49,7 @@ void test4() { logTest((char*)__FUNCTION__); } -int main(int argc, char **argv) { +int main(int argc, char** argv) { // taosInitLog("tmp/syncTest.log", 100); tsAsyncLog = 0;