From 0c8f62f701d263be26c84a30c0fbee300af39ba7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 14:51:02 +0800 Subject: [PATCH 01/12] sync refactor --- source/libs/sync/inc/syncInt.h | 2 +- source/libs/sync/inc/syncRaftEntry.h | 12 ++- source/libs/sync/inc/syncRaftLog.h | 37 ++++++--- source/libs/sync/inc/syncVoteMgr.h | 8 +- source/libs/sync/src/syncMain.c | 25 ++++-- source/libs/sync/src/syncRaftLog.c | 83 +++++++++++-------- source/libs/sync/test/syncIndexMgrTest.cpp | 1 - source/libs/sync/test/syncInitTest.cpp | 1 - source/libs/sync/test/syncPingTest.cpp | 1 - .../libs/sync/test/syncVotesGrantedTest.cpp | 1 - .../libs/sync/test/syncVotesRespondTest.cpp | 1 - 11 files changed, 105 insertions(+), 67 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8b77e292c4..1ca705441d 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -116,7 +116,7 @@ typedef struct SSyncNode { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - char walPath[TSDB_FILENAME_LEN]; + SWal* pWal; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); void* queue; diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 516bef4d48..9cc05d44a9 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -27,10 +27,14 @@ extern "C" { #include "taosdef.h" typedef struct SSyncRaftEntry { - SyncTerm term; - SyncIndex index; - SSyncBuffer data; - int8_t flag; + uint32_t bytes; + uint32_t msgType; + SyncTerm term; + SyncIndex index; + int8_t flag; + uint32_t dataLen; + char data[]; + } SSyncRaftEntry; #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index ee971062cf..8205f19d91 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -24,27 +24,42 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncRaftEntry.h" #include "taosdef.h" -int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); +typedef struct SSyncLogStoreData { + SSyncNode* pSyncNode; + SWal* pWal; +} SSyncLogStoreData; -// get one log entry, user need to free pBuf->data -int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); -// update log store commit index with "index" -int32_t raftLogUpdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index); +void logStoreDestory(SSyncLogStore* pLogStore); -// truncate log with index, entries after the given index (>index) will be deleted -int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index); +// append one log entry +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry); -// return commit index of log -SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore); +// get one log entry, user need to free pEntry->pCont +int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry); + +// truncate log with index, entries after the given index (>=index) will be deleted +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); // return index of last entry -SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore); +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); // return term of last entry -SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore); +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); + +// update log store commit index with "index" +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); + +// return commit index of log +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); + +cJSON* logStore2Json(SSyncLogStore* pLogStore); + +char* logStore2Str(SSyncLogStore* pLogStore); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index ae9cfe8d01..d437e459b9 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -45,8 +45,8 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); -cJSON * voteGranted2Json(SVotesGranted *pVotesGranted); -char * voteGranted2Str(SVotesGranted *pVotesGranted); +cJSON *voteGranted2Json(SVotesGranted *pVotesGranted); +char *voteGranted2Str(SVotesGranted *pVotesGranted); // SVotesRespond ----------------------------- typedef struct SVotesRespond { @@ -62,8 +62,8 @@ void votesRespondDestory(SVotesRespond *pVotesRespond); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); -cJSON * votesRespond2Json(SVotesRespond *pVotesRespond); -char * votesRespond2Str(SVotesRespond *pVotesRespond); +cJSON *votesRespond2Json(SVotesRespond *pVotesRespond); +char *votesRespond2Str(SVotesRespond *pVotesRespond); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3b8d716dbe..a7663be3a5 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -18,8 +18,10 @@ #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" #include "syncEnv.h" +#include "syncIndexMgr.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncRequestVote.h" #include "syncRequestVoteReply.h" @@ -78,7 +80,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->syncCfg = pSyncInfo->syncCfg; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); - memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath)); + pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->queue = pSyncInfo->queue; @@ -114,20 +116,26 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init life cycle - // init server vars + // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath); assert(pSyncNode->pRaftStore != NULL); - // init candidate vars + // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); assert(pSyncNode->pVotesGranted != NULL); pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); assert(pSyncNode->pVotesRespond != NULL); - // init leader vars - pSyncNode->pNextIndex = NULL; - pSyncNode->pMatchIndex = NULL; + // init TLA+ leader vars + pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); + assert(pSyncNode->pNextIndex != NULL); + pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); + assert(pSyncNode->pMatchIndex != NULL); + + // init TLA+ log vars + pSyncNode->pLogStore = logStoreCreate(pSyncNode); + assert(pSyncNode->pLogStore != NULL); + pSyncNode->commitIndex = 0; // init ping timer pSyncNode->pPingTimer = NULL; @@ -177,7 +185,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // init by SSyncInfo cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId); cJSON_AddStringToObject(pRoot, "path", pSyncNode->path); - cJSON_AddStringToObject(pRoot, "walPath", pSyncNode->walPath); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index e467057c8f..d9b053b42d 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,46 +14,61 @@ */ #include "syncRaftLog.h" +#include "wal.h" -int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { + SSyncLogStore* pLogStore = malloc(sizeof(SSyncLogStore)); + assert(pLogStore != NULL); -// get one log entry, user need to free pBuf->data -int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; } + pLogStore->data = malloc(sizeof(SSyncLogStoreData)); + assert(pLogStore->data != NULL); -// TLA+ Spec -// \* Leader i advances its commitIndex. -// \* This is done as a separate step from handling AppendEntries responses, -// \* in part to minimize atomic regions, and in part so that leaders of -// \* single-server clusters are able to mark entries committed. -// AdvanceCommitIndex(i) == -// /\ state[i] = Leader -// /\ LET \* The set of servers that agree up through index. -// Agree(index) == {i} \cup {k \in Server : -// matchIndex[i][k] >= index} -// \* The maximum indexes for which a quorum agrees -// agreeIndexes == {index \in 1..Len(log[i]) : -// Agree(index) \in Quorum} -// \* New value for commitIndex'[i] -// newCommitIndex == -// IF /\ agreeIndexes /= {} -// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] -// THEN -// Max(agreeIndexes) -// ELSE -// commitIndex[i] -// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] -// /\ UNCHANGED <> -// -int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } + SSyncLogStoreData* pData = pLogStore->data; + pData->pSyncNode = pSyncNode; + pData->pWal = pSyncNode->pWal; -// truncate log with index, entries after the given index (>index) will be deleted -int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } + pLogStore->appendEntry = logStoreAppendEntry; + pLogStore->getEntry = logStoreGetEntry; + pLogStore->truncate = logStoreTruncate; + pLogStore->getLastIndex = logStoreLastIndex; + pLogStore->getLastTerm = logStoreLastTerm; + pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; + pLogStore->getCommitIndex = logStoreGetCommitIndex; +} -// return commit index of log -SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore) { return 0; } +void logStoreDestory(SSyncLogStore* pLogStore) { + if (pLogStore != NULL) { + free(pLogStore->data); + free(pLogStore); + } +} + +// append one log entry +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry) {} + +// get one log entry, user need to free pEntry->pCont +int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry) {} + +// truncate log with index, entries after the given index (>=index) will be deleted +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {} // return index of last entry -SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore) { return 0; } +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {} // return term of last entry -SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore) { return 0; } \ No newline at end of file +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {} + +// update log store commit index with "index" +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {} + +// return commit index of log +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {} + +cJSON* logStore2Json(SSyncLogStore* pLogStore) {} + +char* logStore2Str(SSyncLogStore* pLogStore) { + cJSON* pJson = logStore2Json(pLogStore); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} \ No newline at end of file diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index 4e4cd9222b..9eb7b22b8e 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -34,7 +34,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index 669c4e68a5..4aac24487e 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -31,7 +31,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 450e097cc8..ae7977f270 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -26,7 +26,6 @@ SSyncNode* doSync(int myIndex) { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 3edde509f8..a448ad44b6 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -33,7 +33,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 74d42cd531..5cff8e0e26 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -33,7 +33,6 @@ SSyncNode* syncNodeInit() { syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; From fa7f441f425f47b0aeced5e656502f993877a55d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 15:07:43 +0800 Subject: [PATCH 02/12] sync refactor --- include/libs/sync/sync.h | 25 ++--- source/libs/sync/inc/syncAppendEntries.h | 1 - source/libs/sync/inc/syncAppendEntriesReply.h | 1 - source/libs/sync/inc/syncOnMessage.h | 1 - source/libs/sync/inc/syncRaft.h | 93 ------------------- source/libs/sync/inc/syncRaftStore.h | 1 - source/libs/sync/inc/syncRequestVote.h | 1 - source/libs/sync/inc/syncRequestVoteReply.h | 1 - source/libs/sync/inc/syncSnapshot.h | 1 - source/libs/sync/inc/syncTimeout.h | 1 - source/libs/sync/inc/syncVoteMgr.h | 8 +- source/libs/sync/src/syncIO.c | 2 +- source/libs/sync/src/syncMain.c | 1 - source/libs/sync/src/syncMessage.c | 1 - source/libs/sync/src/syncRaft.c | 70 -------------- source/libs/sync/src/syncSnapshot.c | 1 - source/libs/sync/test/syncEnqTest.cpp | 1 + 17 files changed, 19 insertions(+), 191 deletions(-) delete mode 100644 source/libs/sync/inc/syncRaft.h delete mode 100644 source/libs/sync/src/syncRaft.c diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ccbeb00bfd..c83082e3e4 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,6 +24,7 @@ extern "C" { #include #include "taosdef.h" #include "trpc.h" +#include "wal.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -93,19 +94,13 @@ typedef struct SSyncLogStore { void* data; // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf); + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pEntry); - // get one log entry, user need to free pBuf->data - int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf); + // get one log entry, user need to free pEntry->pCont + int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry); - // update log store commit index with "index" - int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // truncate log with index, entries after the given index (>index) will be deleted - int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // return commit index of log - SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); + // truncate log with index, entries after the given index (>=index) will be deleted + int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); // return index of last entry SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore); @@ -113,6 +108,12 @@ typedef struct SSyncLogStore { // return term of last entry SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore); + // update log store commit index with "index" + int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); + + // return commit index of log + SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); + } SSyncLogStore; // raft need to persist two variables in storage: currentTerm, voteFor @@ -134,7 +135,7 @@ typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - char walPath[TSDB_FILENAME_LEN]; + SWal* pWal; SSyncFSM* pFsm; void* rpcClient; diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 35d3046d66..5999ef8300 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 75b82aa531..c0c1f76707 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncOnMessage.h b/source/libs/sync/inc/syncOnMessage.h index 8eae4fed4d..7cb186a812 100644 --- a/source/libs/sync/inc/syncOnMessage.h +++ b/source/libs/sync/inc/syncOnMessage.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncRaft.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h deleted file mode 100644 index bc5cf26a4c..0000000000 --- a/source/libs/sync/inc/syncRaft.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_RAFT_H -#define _TD_LIBS_SYNC_RAFT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include "sync.h" -#include "syncMessage.h" -#include "taosdef.h" - -#if 0 - -typedef struct SRaftId { - SyncNodeId addr; - SyncGroupId vgId; -} SRaftId; - -typedef struct SRaft { - SRaftId id; - SSyncFSM* pFsm; - - int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg); - - int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg); - - int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg); - - int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg); - - int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg); - - int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg); - - int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg); - - int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg); - - int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg); - -} SRaft; - -SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm); - -void raftClose(SRaft* pRaft); - -static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg); - -static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg); - -static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg); - -static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg); - -static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg); - -static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg); - -static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg); - -static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg); - -static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg); - -int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); - -static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); - -#endif - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_SYNC_RAFT_H*/ diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 591a5b9963..1c25b799b4 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -25,7 +25,6 @@ extern "C" { #include #include "cJSON.h" #include "syncInt.h" -#include "syncRaft.h" #include "taosdef.h" #define RAFT_STORE_BLOCK_SIZE 512 diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 8bb4976de2..fd4ccd5371 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index ab9430b857..bcaf71a541 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 89fcb230fb..611f33a0f2 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -24,7 +24,6 @@ extern "C" { #include #include #include "syncInt.h" -#include "syncRaft.h" #include "taosdef.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index efd5aae48e..25c26c909d 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index d437e459b9..ae9cfe8d01 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -45,8 +45,8 @@ void voteGrantedDestroy(SVotesGranted *pVotesGranted); bool voteGrantedMajority(SVotesGranted *pVotesGranted); void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); -cJSON *voteGranted2Json(SVotesGranted *pVotesGranted); -char *voteGranted2Str(SVotesGranted *pVotesGranted); +cJSON * voteGranted2Json(SVotesGranted *pVotesGranted); +char * voteGranted2Str(SVotesGranted *pVotesGranted); // SVotesRespond ----------------------------- typedef struct SVotesRespond { @@ -62,8 +62,8 @@ void votesRespondDestory(SVotesRespond *pVotesRespond); bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); void votesRespondReset(SVotesRespond *pVotesRespond, SyncTerm term); -cJSON *votesRespond2Json(SVotesRespond *pVotesRespond); -char *votesRespond2Str(SVotesRespond *pVotesRespond); +cJSON * votesRespond2Json(SVotesRespond *pVotesRespond); +char * votesRespond2Str(SVotesRespond *pVotesRespond); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index d37c821a24..7edf561f5b 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -15,7 +15,7 @@ #include "syncIO.h" #include -#include "syncOnMessage.h" +#include "syncMessage.h" #include "tglobal.h" #include "ttimer.h" #include "tutil.h" diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a7663be3a5..4d19444abd 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -20,7 +20,6 @@ #include "syncEnv.h" #include "syncIndexMgr.h" #include "syncInt.h" -#include "syncRaft.h" #include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncRequestVote.h" diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 14f139a803..6732cd3958 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,7 +14,6 @@ */ #include "syncMessage.h" -#include "syncRaft.h" #include "syncUtil.h" #include "tcoding.h" diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c deleted file mode 100644 index b07c6ea797..0000000000 --- a/source/libs/sync/src/syncRaft.c +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "syncRaft.h" -#include "sync.h" - -#if 0 - -SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { - SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); - assert(pRaft != NULL); - - pRaft->id = raftId; - pRaft->pFsm = pFsm; - - pRaft->FpPing = doRaftPing; - pRaft->FpOnPing = onRaftPing; - pRaft->FpOnPingReply = onRaftPingReply; - - pRaft->FpRequestVote = doRaftRequestVote; - pRaft->FpOnRequestVote = onRaftRequestVote; - pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply; - - pRaft->FpAppendEntries = doRaftAppendEntries; - pRaft->FpOnAppendEntries = onRaftAppendEntries; - pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply; - - return pRaft; -} - -void raftClose(SRaft* pRaft) { - assert(pRaft != NULL); - free(pRaft); -} - -static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; } - -static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; } - -static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; } - -static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; } - -static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; } - -static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; } - -static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; } - -static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; } - -static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; } - -int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } - -static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } - -#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index da194780ff..42b2bd993b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,7 +14,6 @@ */ #include "syncSnapshot.h" -#include "syncRaft.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index e2bc9a73ae..e1706bb40b 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -2,6 +2,7 @@ #include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" +#include "syncMessage.h" #include "syncRaftStore.h" void logTest() { From d87411116b3343e754c9564cedef23717a55b70d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 16:34:34 +0800 Subject: [PATCH 03/12] sync refactor --- include/libs/sync/sync.h | 2 +- source/libs/sync/inc/syncMessage.h | 15 +++++- source/libs/sync/src/syncMain.c | 25 ++++++++-- source/libs/sync/src/syncMessage.c | 76 +++++++++++++++++++++++++++++- 4 files changed, 112 insertions(+), 6 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index c83082e3e4..1e5aa18010 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -154,7 +154,7 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3405f0f6cc..7f54dee426 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -123,12 +123,25 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); typedef struct SyncClientRequest { uint32_t bytes; uint32_t msgType; - int64_t seqNum; + uint32_t originalRpcType; + uint64_t seqNum; bool isWeak; uint32_t dataLen; char data[]; } SyncClientRequest; +#define SYNC_CLIENT_REQUEST_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(uint32_t)) + +SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); +void syncClientRequestDestroy(SyncClientRequest* pMsg); +void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); +void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); + // --------------------------------------------- typedef struct SyncClientRequestReply { uint32_t bytes; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 4d19444abd..7e8754b684 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -60,13 +60,32 @@ int64_t syncStart(const SSyncInfo* pSyncInfo) { return 0; } -void syncStop(int64_t rid) {} +void syncStop(int64_t rid) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + syncNodeClose(pSyncNode); +} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncClientRequestDestroy(pSyncMsg); + } else { + sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); + return -1; // need define err code !! + } + return 0; +} -ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } +ESyncState syncGetMyRole(int64_t rid) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + return pSyncNode->state; +} void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 6732cd3958..3af073541b 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -35,7 +35,8 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { pRoot = syncPingReply2Json(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { - pRoot = syncRpcUnknownMsg2Json(); + SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont; + pRoot = syncClientRequest2Json(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { pRoot = syncRpcUnknownMsg2Json(); @@ -148,6 +149,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_PING; pMsg->dataLen = dataLen; + return pMsg; } void syncPingDestroy(SyncPing* pMsg) { @@ -246,6 +248,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_PING_REPLY; pMsg->dataLen = dataLen; + return pMsg; } void syncPingReplyDestroy(SyncPingReply* pMsg) { @@ -336,6 +339,73 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) return pMsg; } +// ---- message process SyncClientRequest---- +SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_CLIENT_REQUEST_FIX_LEN + dataLen; + SyncClientRequest* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_CLIENT_REQUEST; + pMsg->seqNum = 0; + pMsg->isWeak = false; + pMsg->dataLen = dataLen; + return pMsg; +} + +void syncClientRequestDestroy(SyncClientRequest* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncClientRequestSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg) { + syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot); + return pJson; +} + +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) { + SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); + pMsg->originalRpcType = pOriginalRpcMsg->msgType; + pMsg->seqNum = seqNum; + pMsg->isWeak = isWeak; + memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); + return pMsg; +} + // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild() { uint32_t bytes = sizeof(SyncRequestVote); @@ -343,6 +413,7 @@ SyncRequestVote* syncRequestVoteBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_REQUEST_VOTE; + return pMsg; } void syncRequestVoteDestroy(SyncRequestVote* pMsg) { @@ -428,6 +499,7 @@ SyncRequestVoteReply* SyncRequestVoteReplyBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_REQUEST_VOTE_REPLY; + return pMsg; } void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { @@ -511,6 +583,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_APPEND_ENTRIES; pMsg->dataLen = dataLen; + return pMsg; } void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { @@ -603,6 +676,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_APPEND_ENTRIES_REPLY; + return pMsg; } void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { From 41516e024a99952279e40b6b2f9ae1cdbab50d36 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 18:33:41 +0800 Subject: [PATCH 04/12] sync refactor --- source/libs/sync/inc/syncInt.h | 8 +- source/libs/sync/inc/syncMessage.h | 1 + source/libs/sync/src/syncIO.c | 17 ++-- source/libs/sync/src/syncMain.c | 15 +++- source/libs/sync/src/syncMessage.c | 7 ++ source/libs/sync/test/syncEnvTest.cpp | 31 +++---- source/libs/sync/test/syncIndexMgrTest.cpp | 2 +- source/libs/sync/test/syncInitTest.cpp | 2 +- source/libs/sync/test/syncPingTest.cpp | 84 ++++++++++++------- .../libs/sync/test/syncVotesGrantedTest.cpp | 2 +- .../libs/sync/test/syncVotesRespondTest.cpp | 2 +- 11 files changed, 106 insertions(+), 65 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 1ca705441d..2932240ec1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -116,6 +116,7 @@ typedef struct SSyncNode { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; + char raftStorePath[TSDB_FILENAME_LEN * 2]; SWal* pWal; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); @@ -195,8 +196,6 @@ typedef struct SSyncNode { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -cJSON* syncNode2Json(const SSyncNode* pSyncNode); -char* syncNode2Str(const SSyncNode* pSyncNode); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); @@ -213,6 +212,11 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); +// for debug +cJSON* syncNode2Json(const SSyncNode* pSyncNode); +char* syncNode2Str(const SSyncNode* pSyncNode); +void syncNodePrint(char* s, const SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 7f54dee426..f41a423468 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -46,6 +46,7 @@ typedef enum ESyncMessageType { // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); +char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); // --------------------------------------------- typedef enum ESyncTimeoutType { diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 7edf561f5b..2cc56443ce 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -220,12 +220,17 @@ static void *syncIOConsumerFunc(void *param) { while (1) { int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); - if (numOfMsgs <= 0) break; + if (numOfMsgs <= 0) { + break; + } for (int i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); + + char *s = syncRpcMsg2Str(pRpcMsg); sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen, - (char *)(pRpcMsg->pCont)); + s); + free(s); if (pRpcMsg->msgType == SYNC_PING) { if (io->FpOnSyncPing != NULL) { @@ -247,7 +252,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { - if (io->FpOnSyncRequestVote) { + if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg; pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen); syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg); @@ -256,7 +261,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { - if (io->FpOnSyncRequestVoteReply) { + if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg; pSyncMsg = SyncRequestVoteReplyBuild(); syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); @@ -265,7 +270,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { - if (io->FpOnSyncAppendEntries) { + if (io->FpOnSyncAppendEntries != NULL) { SyncAppendEntries *pSyncMsg; pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen); syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg); @@ -274,7 +279,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { - if (io->FpOnSyncAppendEntriesReply) { + if (io->FpOnSyncAppendEntriesReply != NULL) { SyncAppendEntriesReply *pSyncMsg; pSyncMsg = syncAppendEntriesReplyBuild(); syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7e8754b684..29803ae560 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -98,6 +98,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->syncCfg = pSyncInfo->syncCfg; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); + snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path); pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; @@ -136,6 +137,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; + pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); assert(pSyncNode->pRaftStore != NULL); // init TLA+ candidate vars @@ -325,6 +327,13 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } +void syncNodePrint(char* s, const SSyncNode* pSyncNode) { + char* ss = syncNode2Str(pSyncNode); + // sTrace("syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); + fprintf(stderr, "syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); + free(ss); +} + int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); @@ -499,6 +508,8 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { } static void syncNodeEqPingTimer(void* param, void* tmrId) { + sTrace("<-- syncNodeEqPingTimer -->"); + SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), @@ -511,7 +522,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { // reset timer ms // pSyncNode->pingTimerMS += 100; - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock, @@ -557,7 +568,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { // reset timer ms // pSyncNode->heartbeatTimerMS += 100; - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 3af073541b..11f823a048 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -76,6 +76,13 @@ cJSON* syncRpcUnknownMsg2Json() { return pJson; } +char* syncRpcMsg2Str(SRpcMsg* pRpcMsg) { + cJSON* pJson = syncRpcMsg2Json(pRpcMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + // ---- message process SyncTimeout---- SyncTimeout* syncTimeoutBuild() { uint32_t bytes = sizeof(SyncTimeout); diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 1ac4357c5a..101c0efe9a 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -3,6 +3,7 @@ #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" +#include "ttime.h" void logTest() { sTrace("--- sync log test: trace"); @@ -13,24 +14,13 @@ void logTest() { sFatal("--- sync log test: fatal"); } -void doSync() { - SSyncInfo syncInfo; - syncInfo.vgId = 1; +void *pTimer = NULL; +void *pTimerMgr = NULL; +int g = 300; - SSyncCfg* pCfg = &syncInfo.syncCfg; - pCfg->replicaNum = 3; - - pCfg->nodeInfo[0].nodePort = 7010; - taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - - pCfg->nodeInfo[1].nodePort = 7110; - taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - - pCfg->nodeInfo[2].nodePort = 7210; - taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); - - SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); - assert(pSyncNode != NULL); +static void timerFp(void *param, void *tmrId) { + printf("param:%p, tmrId:%p, pTimer:%p, pTimerMgr:%p \n", param, tmrId, pTimer, pTimerMgr); + taosTmrReset(timerFp, 1000, param, pTimerMgr, &pTimer); } int main() { @@ -41,13 +31,12 @@ int main() { logTest(); - // ret = syncIOStart(); - // assert(ret == 0); - ret = syncEnvStart(); assert(ret == 0); - // doSync(); + // timer + pTimerMgr = taosTmrInit(1000, 50, 10000, "SYNC-ENV-TEST"); + taosTmrStart(timerFp, 1000, &g, pTimerMgr); while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index 9eb7b22b8e..6bad8f09cf 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -33,7 +33,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index 4aac24487e..f665c5a612 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -30,7 +30,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index ae7977f270..83f1f67eb1 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -1,8 +1,10 @@ +#include #include #include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" +#include "syncUtil.h" void logTest() { sTrace("--- sync log test: trace"); @@ -13,58 +15,65 @@ void logTest() { sFatal("--- sync log test: fatal"); } -uint16_t ports[3] = {7010, 7110, 7210}; +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; -SSyncNode* doSync(int myIndex) { - SSyncFSM* pFsm; +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; - SSyncInfo syncInfo; - syncInfo.vgId = 1; +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; - pCfg->replicaNum = 3; + pCfg->replicaNum = replicaNum; - pCfg->nodeInfo[0].nodePort = ports[0]; - snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - - pCfg->nodeInfo[1].nodePort = ports[1]; - snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - - pCfg->nodeInfo[2].nodePort = ports[2]; - snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; } -void timerPingAll(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - syncNodePingAll(pSyncNode); +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } } int main(int argc, char** argv) { - // taosInitLog((char*)"syncPingTest.log", 100000, 10); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; - logTest(); - - int myIndex = 0; + myIndex = 0; if (argc >= 2) { myIndex = atoi(argv[1]); } @@ -75,30 +84,45 @@ int main(int argc, char** argv) { ret = syncEnvStart(); assert(ret == 0); - SSyncNode* pSyncNode = doSync(myIndex); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + syncNodePrint((char*)"----1", pSyncNode); + initRaftId(pSyncNode); + + //--------------------------- + + sTrace("syncNodeStartPingTimer ..."); ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----2", pSyncNode); + sTrace("sleep ..."); taosMsleep(10000); + sTrace("syncNodeStopPingTimer ..."); ret = syncNodeStopPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----3", pSyncNode); - taosMsleep(10000); + sTrace("sleep ..."); + taosMsleep(5000); + sTrace("syncNodeStartPingTimer ..."); ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----4", pSyncNode); + sTrace("sleep ..."); taosMsleep(10000); + sTrace("syncNodeStopPingTimer ..."); ret = syncNodeStopPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----5", pSyncNode); while (1) { + sTrace("while 1 sleep ..."); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index a448ad44b6..504fa3034a 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -32,7 +32,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 5cff8e0e26..0b6abef212 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -32,7 +32,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; From c462ef61e3082da67721dcfdb43aa8b9d82316d1 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 18:35:57 +0800 Subject: [PATCH 05/12] sync refactor --- source/libs/sync/src/syncMain.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 29803ae560..5cd7149b72 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -544,7 +544,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { // reset timer ms pSyncNode->electTimerMS = syncUtilElectRandomMS(); - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu", From 3dfb9bb02283a962835bbe55620833ee0dcab2bc Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 18:56:39 +0800 Subject: [PATCH 06/12] sync refactor --- source/libs/sync/inc/syncMessage.h | 3 +-- source/libs/sync/inc/syncRaftEntry.h | 9 +++++++-- source/libs/sync/inc/syncRaftLog.h | 4 ++-- source/libs/sync/src/syncRaftEntry.c | 4 ++++ source/libs/sync/src/syncRaftLog.c | 4 ++-- 5 files changed, 16 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index f41a423468..dcd8f7c74e 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -24,8 +24,7 @@ extern "C" { #include #include #include "cJSON.h" -#include "sync.h" -#include "syncRaftEntry.h" +#include "syncInt.h" #include "taosdef.h" // encode as uint32 diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 9cc05d44a9..53cf647022 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -24,19 +24,24 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncMessage.h" #include "taosdef.h" typedef struct SSyncRaftEntry { uint32_t bytes; uint32_t msgType; + uint32_t originalRpcType; + uint64_t seqNum; + bool isWeak; SyncTerm term; SyncIndex index; - int8_t flag; uint32_t dataLen; char data[]; - } SSyncRaftEntry; +SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); +void syncEntryDestory(SSyncRaftEntry* pEntry); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 8205f19d91..634d06903c 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -37,10 +37,10 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); void logStoreDestory(SSyncLogStore* pLogStore); // append one log entry -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry); +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); // get one log entry, user need to free pEntry->pCont -int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry); +int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry); // truncate log with index, entries after the given index (>=index) will be deleted int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index e525d3c7c2..fff4f1b50b 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -14,3 +14,7 @@ */ #include "syncRaftEntry.h" + +SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {} + +void syncEntryDestory(SSyncRaftEntry* pEntry) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d9b053b42d..c4e1feb374 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -44,10 +44,10 @@ void logStoreDestory(SSyncLogStore* pLogStore) { } // append one log entry -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SRpcMsg* pEntry) {} +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {} // get one log entry, user need to free pEntry->pCont -int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry) {} +int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry) {} // truncate log with index, entries after the given index (>=index) will be deleted int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {} From d083c7974407afb65449368edcc88f4e84401b30 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 19:13:09 +0800 Subject: [PATCH 07/12] sync refactor --- source/libs/sync/inc/syncRaftEntry.h | 8 ++++ source/libs/sync/src/syncRaftEntry.c | 63 +++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 53cf647022..2d77af8835 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -39,8 +39,16 @@ typedef struct SSyncRaftEntry { char data[]; } SSyncRaftEntry; +#define SYNC_ENTRY_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(SyncTerm) + \ + sizeof(SyncIndex) + sizeof(uint32_t)) + SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); void syncEntryDestory(SSyncRaftEntry* pEntry); +void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen); +void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry); +cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); +char* syncEntry2Str(const SSyncRaftEntry* pEntry); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index fff4f1b50b..cd14ea502e 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -15,6 +15,65 @@ #include "syncRaftEntry.h" -SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) {} +SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { + uint32_t bytes = SYNC_ENTRY_FIX_LEN + pMsg->bytes; + SSyncRaftEntry* pEntry = malloc(bytes); + assert(pEntry != NULL); + memset(pEntry, 0, bytes); -void syncEntryDestory(SSyncRaftEntry* pEntry) {} \ No newline at end of file + pEntry->bytes = bytes; + pEntry->msgType = pMsg->msgType; + pEntry->originalRpcType = pMsg->originalRpcType; + pEntry->seqNum = pMsg->seqNum; + pEntry->isWeak = pMsg->isWeak; + pEntry->term = term; + pEntry->index = index; + pEntry->dataLen = pMsg->dataLen; + memcpy(pEntry->data, pMsg->data, pMsg->dataLen); + + return pEntry; +} + +void syncEntryDestory(SSyncRaftEntry* pEntry) { + if (pEntry != NULL) { + free(pEntry); + } +} + +void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen) { + assert(pEntry->bytes <= bufLen); + memcpy(buf, pEntry, pEntry->bytes); +} + +void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry) { + memcpy(pEntry, buf, len); + assert(len == pEntry->bytes); +} + +cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index); + cJSON_AddStringToObject(pRoot, "index", u64buf); + cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot); + return pJson; +} + +char* syncEntry2Str(const SSyncRaftEntry* pEntry) { + cJSON* pJson = syncEntry2Json(pEntry); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} \ No newline at end of file From 30be9d3e314523d0a599dc88048e6a62a57fd610 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 20:04:39 +0800 Subject: [PATCH 08/12] sync refactor --- source/libs/sync/inc/syncRaftLog.h | 4 +- source/libs/sync/src/syncRaftLog.c | 65 ++++++++++++++++++++++++++---- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index 634d06903c..ecdb544302 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -40,7 +40,7 @@ void logStoreDestory(SSyncLogStore* pLogStore); int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); // get one log entry, user need to free pEntry->pCont -int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry); +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); // truncate log with index, entries after the given index (>=index) will be deleted int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); @@ -57,6 +57,8 @@ int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); // return commit index of log SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); +SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); + cJSON* logStore2Json(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore); diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index c4e1feb374..d5735d9142 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -44,25 +44,76 @@ void logStoreDestory(SSyncLogStore* pLogStore) { } // append one log entry -int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {} +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + char* buf = malloc(pEntry->bytes); + + syncEntrySerialize(pEntry, buf, pEntry->bytes); + walWrite(pWal, pEntry->index, pEntry->msgType, buf, pEntry->bytes); + walFsync(pWal, true); + + free(buf); +} // get one log entry, user need to free pEntry->pCont -int32_t logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry* pEntry) {} +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SSyncRaftEntry* pEntry; + + SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + walReadWithHandle(pWalHandle, index); + + // need to hold, do not new every time!! + walCloseReadHandle(pWalHandle); + return pEntry; +} // truncate log with index, entries after the given index (>=index) will be deleted -int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) {} +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + walRollback(pWal, fromIndex); +} // return index of last entry -SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) {} +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { + SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); + SyncIndex lastIndex = pLastEntry->index; + free(pLastEntry); + return lastIndex; +} // return term of last entry -SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) {} +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { + SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); + SyncTerm lastTerm = pLastEntry->term; + free(pLastEntry); + return lastTerm; +} // update log store commit index with "index" -int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {} +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + walCommit(pWal, index); +} // return commit index of log -SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) {} +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + return pData->pSyncNode->commitIndex; +} + +SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SyncIndex lastIndex = walGetLastVer(pWal); + SSyncRaftEntry* pEntry; + pEntry = logStoreGetEntry(pLogStore, lastIndex); + return pEntry; +} cJSON* logStore2Json(SSyncLogStore* pLogStore) {} From 3c9875e955e0f3d1b52f7ef03625c8e0810558a7 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Wed, 9 Mar 2022 20:24:27 +0800 Subject: [PATCH 09/12] sync refactor --- include/libs/sync/sync.h | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 1e5aa18010..68ca9eff17 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -88,16 +88,19 @@ typedef struct SSyncFSM { } SSyncFSM; +struct SSyncRaftEntry; +typedef struct SSyncRaftEntry SSyncRaftEntry; + // abstract definition of log store in raft // SWal implements it typedef struct SSyncLogStore { void* data; // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pEntry); + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); // get one log entry, user need to free pEntry->pCont - int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pEntry); + SSyncRaftEntry* (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index); // truncate log with index, entries after the given index (>=index) will be deleted int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); From a51e5761266ec0bdeeb12ec2706982ccf4171d96 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 10 Mar 2022 11:21:04 +0800 Subject: [PATCH 10/12] sync refactor --- source/libs/sync/inc/syncRaftLog.h | 3 + source/libs/sync/src/syncRaftLog.c | 35 +++++- source/libs/sync/test/CMakeLists.txt | 14 +++ source/libs/sync/test/syncInitTest.cpp | 9 +- source/libs/sync/test/syncLogStoreTest.cpp | 132 +++++++++++++++++++++ 5 files changed, 188 insertions(+), 5 deletions(-) create mode 100644 source/libs/sync/test/syncLogStoreTest.cpp diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index ecdb544302..d59b3206b5 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -63,6 +63,9 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore); char* logStore2Str(SSyncLogStore* pLogStore); +// for debug +void logStorePrint(SSyncLogStore* pLogStore); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d5735d9142..aa2595c199 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -115,11 +115,44 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { return pEntry; } -cJSON* logStore2Json(SSyncLogStore* pLogStore) {} +cJSON* logStore2Json(SSyncLogStore* pLogStore) { + char u64buf[128]; + + SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; + cJSON* pRoot = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); + + cJSON* pEntries = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "pEntries", pEntries); + SyncIndex lastIndex = logStoreLastIndex(pLogStore); + for (SyncIndex i = 1; i <= lastIndex; ++i) { + SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); + cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); + syncEntryDestory(pEntry); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot); + return pJson; +} char* logStore2Str(SSyncLogStore* pLogStore) { cJSON* pJson = logStore2Json(pLogStore); char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; +} + +// for debug +void logStorePrint(SSyncLogStore* pLogStore) { + char* s = logStore2Str(pLogStore); + sTrace("%s", s); + free(s); } \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 5a5186c7e2..2163583f0a 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -15,6 +15,7 @@ add_executable(syncUtilTest "") add_executable(syncVotesGrantedTest "") add_executable(syncVotesRespondTest "") add_executable(syncIndexMgrTest "") +add_executable(syncLogStoreTest "") target_sources(syncTest @@ -85,6 +86,10 @@ target_sources(syncIndexMgrTest PRIVATE "syncIndexMgrTest.cpp" ) +target_sources(syncLogStoreTest + PRIVATE + "syncLogStoreTest.cpp" +) target_include_directories(syncTest @@ -172,6 +177,11 @@ target_include_directories(syncIndexMgrTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLogStoreTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -242,6 +252,10 @@ target_link_libraries(syncIndexMgrTest sync gtest_main ) +target_link_libraries(syncLogStoreTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index f665c5a612..b6794544eb 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -53,6 +53,7 @@ SSyncNode* syncNodeInit() { gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; @@ -88,11 +89,11 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = syncInitTest(); assert(pSyncNode != NULL); - char* serialized = syncNode2Str(pSyncNode); - printf("%s\n", serialized); - free(serialized); + syncNodePrint((char*)"syncInitTest", pSyncNode); initRaftId(pSyncNode); + //-------------------------------------------------------------- + return 0; -} +} \ No newline at end of file diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp new file mode 100644 index 0000000000..a859186bbe --- /dev/null +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -0,0 +1,132 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 1; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; +SWal* pWal; +SSyncNode* pSyncNode; + +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 100000; + walCfg.segSize = 100000; + walCfg.level = TAOS_WAL_WRITE; + pWal = walOpen("./wal_test", &walCfg); + + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void logStoreTest() { + logStorePrint(pSyncNode->pLogStore); + for (int i = 0; i < 5; ++i) { + SSyncRaftEntry* pEntry; + pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); + } + logStorePrint(pSyncNode->pLogStore); + + pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3); + logStorePrint(pSyncNode->pLogStore); +} + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + + syncNodePrint((char*)"syncLogStoreTest", pSyncNode); + + initRaftId(pSyncNode); + + //-------------------------------------------------------------- + + logStoreTest(); + + //-------------------------------------------------------------- + // walClose(pWal); + + return 0; +} From ebeb4bb7a60caaf9b082b1ae06df1dc440222822 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 10 Mar 2022 14:34:02 +0800 Subject: [PATCH 11/12] sync refactor --- source/libs/sync/inc/syncMessage.h | 3 - source/libs/sync/inc/syncRaftEntry.h | 12 ++-- source/libs/sync/inc/syncUtil.h | 3 + source/libs/sync/src/syncMessage.c | 2 +- source/libs/sync/src/syncRaftEntry.c | 44 ++++++++++++-- source/libs/sync/src/syncRaftLog.c | 19 ++++-- source/libs/sync/src/syncUtil.c | 36 +++++++++++ source/libs/sync/test/CMakeLists.txt | 16 +++++ source/libs/sync/test/syncEntryTest.cpp | 81 +++++++++++++++++++++++++ 9 files changed, 195 insertions(+), 21 deletions(-) create mode 100644 source/libs/sync/test/syncEntryTest.cpp diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index dcd8f7c74e..6231cb8399 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -130,9 +130,6 @@ typedef struct SyncClientRequest { char data[]; } SyncClientRequest; -#define SYNC_CLIENT_REQUEST_FIX_LEN \ - (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(uint32_t)) - SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); void syncClientRequestDestroy(SyncClientRequest* pMsg); void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 2d77af8835..b607849dc8 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -39,16 +39,14 @@ typedef struct SSyncRaftEntry { char data[]; } SSyncRaftEntry; -#define SYNC_ENTRY_FIX_LEN \ - (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint32_t) + sizeof(uint64_t) + sizeof(bool) + sizeof(SyncTerm) + \ - sizeof(SyncIndex) + sizeof(uint32_t)) - -SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); +SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); +SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); void syncEntryDestory(SSyncRaftEntry* pEntry); -void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen); -void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry); +char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); +SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); +void syncEntryPrint(const SSyncRaftEntry* pEntry); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index f2c979da99..2bbc1948dd 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -49,6 +49,9 @@ cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); char* syncUtilRaftId2Str(const SRaftId* p); const char* syncUtilState2String(ESyncState state); +bool syncUtilCanPrint(char c); +char* syncUtilprintBin(char* ptr, uint32_t len); +char* syncUtilprintBin2(char* ptr, uint32_t len); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 11f823a048..baef49d748 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -348,7 +348,7 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) // ---- message process SyncClientRequest---- SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { - uint32_t bytes = SYNC_CLIENT_REQUEST_FIX_LEN + dataLen; + uint32_t bytes = sizeof(SyncClientRequest) + dataLen; SyncClientRequest* pMsg = malloc(bytes); memset(pMsg, 0, bytes); pMsg->bytes = bytes; diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index cd14ea502e..7774cbb0ce 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -14,14 +14,22 @@ */ #include "syncRaftEntry.h" +#include "syncUtil.h" -SSyncRaftEntry* syncEntryBuild(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { - uint32_t bytes = SYNC_ENTRY_FIX_LEN + pMsg->bytes; +SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen; SSyncRaftEntry* pEntry = malloc(bytes); assert(pEntry != NULL); memset(pEntry, 0, bytes); - pEntry->bytes = bytes; + pEntry->dataLen = dataLen; + return pEntry; +} + +SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); + assert(pEntry != NULL); + pEntry->msgType = pMsg->msgType; pEntry->originalRpcType = pMsg->originalRpcType; pEntry->seqNum = pMsg->seqNum; @@ -40,14 +48,23 @@ void syncEntryDestory(SSyncRaftEntry* pEntry) { } } -void syncEntrySerialize(const SSyncRaftEntry* pEntry, char* buf, uint32_t bufLen) { - assert(pEntry->bytes <= bufLen); +char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { + char* buf = malloc(pEntry->bytes); + assert(buf != NULL); memcpy(buf, pEntry, pEntry->bytes); + if (len != NULL) { + *len = pEntry->bytes; + } + return buf; } -void syncEntryDeserialize(const char* buf, uint32_t len, SSyncRaftEntry* pEntry) { +SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SSyncRaftEntry* pEntry = malloc(bytes); + assert(pEntry != NULL); memcpy(pEntry, buf, len); assert(len == pEntry->bytes); + return pEntry; } cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { @@ -66,6 +83,15 @@ cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { cJSON_AddStringToObject(pRoot, "index", u64buf); cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); + char* s; + s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + + s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); + cJSON* pJson = cJSON_CreateObject(); cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot); return pJson; @@ -76,4 +102,10 @@ char* syncEntry2Str(const SSyncRaftEntry* pEntry) { char* serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; +} + +void syncEntryPrint(const SSyncRaftEntry* pEntry) { + char* s = syncEntry2Str(pEntry); + sTrace("%s", s); + free(s); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index aa2595c199..d8a460629b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -47,13 +47,16 @@ void logStoreDestory(SSyncLogStore* pLogStore) { int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - char* buf = malloc(pEntry->bytes); - syncEntrySerialize(pEntry, buf, pEntry->bytes); - walWrite(pWal, pEntry->index, pEntry->msgType, buf, pEntry->bytes); + assert(pEntry->index == logStoreLastIndex(pLogStore) + 1); + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(serialized != NULL); + + walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); walFsync(pWal, true); - free(buf); + free(serialized); } // get one log entry, user need to free pEntry->pCont @@ -64,6 +67,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); walReadWithHandle(pWalHandle, index); + pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); + assert(pEntry != NULL); // need to hold, do not new every time!! walCloseReadHandle(pWalHandle); @@ -79,9 +84,15 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { // return index of last entry SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { + /* SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); SyncIndex lastIndex = pLastEntry->index; free(pLastEntry); + */ + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + int64_t last = walGetLastVer(pWal); + SyncIndex lastIndex = last < 0 ? 0 : last; return lastIndex; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 3fb430a714..736260cafc 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -148,4 +148,40 @@ const char* syncUtilState2String(ESyncState state) { } else { return "TAOS_SYNC_STATE_UNKNOWN"; } +} + +bool syncUtilCanPrint(char c) { + if (c >= 32 && c <= 126) { + return true; + } else { + return false; + } +} + +char* syncUtilprintBin(char* ptr, uint32_t len) { + char* s = malloc(len + 1); + assert(s != NULL); + memset(s, 0, len + 1); + memcpy(s, ptr, len); + + for (int i = 0; i < len; ++i) { + if (!syncUtilCanPrint(s[i])) { + s[i] = '.'; + } + } + return s; +} + +char* syncUtilprintBin2(char* ptr, uint32_t len) { + uint32_t len2 = len * 4 + 1; + char* s = malloc(len2); + assert(s != NULL); + memset(s, 0, len2); + + char* p = s; + for (int i = 0; i < len; ++i) { + int n = sprintf(p, "%d,", ptr[i]); + p += n; + } + return s; } \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2163583f0a..bfde08ffac 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -16,6 +16,7 @@ add_executable(syncVotesGrantedTest "") add_executable(syncVotesRespondTest "") add_executable(syncIndexMgrTest "") add_executable(syncLogStoreTest "") +add_executable(syncEntryTest "") target_sources(syncTest @@ -90,6 +91,10 @@ target_sources(syncLogStoreTest PRIVATE "syncLogStoreTest.cpp" ) +target_sources(syncEntryTest + PRIVATE + "syncEntryTest.cpp" +) target_include_directories(syncTest @@ -182,6 +187,11 @@ target_include_directories(syncLogStoreTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEntryTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -256,6 +266,10 @@ target_link_libraries(syncLogStoreTest sync gtest_main ) +target_link_libraries(syncEntryTest + sync + gtest_main +) enable_testing() @@ -263,3 +277,5 @@ add_test( NAME sync_test COMMAND syncTest ) + + diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp new file mode 100644 index 0000000000..e54daaa79a --- /dev/null +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -0,0 +1,81 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void test1() { + SSyncRaftEntry* pEntry = syncEntryBuild(10); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100; + pEntry->index = 200; + strcpy(pEntry->data, "test1"); + + syncEntryPrint(pEntry); + syncEntryDestory(pEntry); +} + +void test2() { + SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + pSyncMsg->originalRpcType = 33; + pSyncMsg->seqNum = 11; + pSyncMsg->isWeak = 1; + strcpy(pSyncMsg->data, "test2"); + + SSyncRaftEntry* pEntry = syncEntryBuild2(pSyncMsg, 100, 200); + syncEntryPrint(pEntry); + + syncClientRequestDestroy(pSyncMsg); + syncEntryDestory(pEntry); +} + +void test3() { + SSyncRaftEntry* pEntry = syncEntryBuild(10); + assert(pEntry != NULL); + pEntry->msgType = 11; + pEntry->originalRpcType = 22; + pEntry->seqNum = 33; + pEntry->isWeak = true; + pEntry->term = 44; + pEntry->index = 55; + strcpy(pEntry->data, "test3"); + syncEntryPrint(pEntry); + + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(serialized != NULL); + SSyncRaftEntry* pEntry2 = syncEntryDeserialize(serialized, len); + syncEntryPrint(pEntry2); + + free(serialized); + syncEntryDestory(pEntry2); + syncEntryDestory(pEntry); +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + test1(); + test2(); + test3(); + + return 0; +} From 889d1339e598d8dd4cae3e5bd7a9410c6563157d Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 10 Mar 2022 16:18:16 +0800 Subject: [PATCH 12/12] sync refactor --- source/libs/sync/inc/syncRaftEntry.h | 1 + source/libs/sync/src/syncRaftEntry.c | 6 ++++ source/libs/sync/src/syncRaftLog.c | 34 ++++++++++++---------- source/libs/sync/test/syncLogStoreTest.cpp | 33 +++++++++++++-------- 4 files changed, 47 insertions(+), 27 deletions(-) diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index b607849dc8..be25675db4 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -47,6 +47,7 @@ SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); char* syncEntry2Str(const SSyncRaftEntry* pEntry); void syncEntryPrint(const SSyncRaftEntry* pEntry); +void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 7774cbb0ce..959bf49ee7 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -108,4 +108,10 @@ void syncEntryPrint(const SSyncRaftEntry* pEntry) { char* s = syncEntry2Str(pEntry); sTrace("%s", s); free(s); +} + +void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) { + char* ss = syncEntry2Str(pEntry); + sTrace("%s | %s", s, ss); + free(ss); } \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index d8a460629b..d177d3ac9b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -53,9 +53,11 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { char* serialized = syncEntrySerialize(pEntry, &len); assert(serialized != NULL); - walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); - walFsync(pWal, true); + int code; + code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); + assert(code == 0); + walFsync(pWal, true); free(serialized); } @@ -84,23 +86,20 @@ int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { // return index of last entry SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { - /* - SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); - SyncIndex lastIndex = pLastEntry->index; - free(pLastEntry); - */ SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; - int64_t last = walGetLastVer(pWal); - SyncIndex lastIndex = last < 0 ? 0 : last; + SyncIndex lastIndex = walGetLastVer(pWal); return lastIndex; } // return term of last entry SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { + SyncTerm lastTerm = 0; SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); - SyncTerm lastTerm = pLastEntry->term; - free(pLastEntry); + if (pLastEntry != NULL) { + lastTerm = pLastEntry->term; + free(pLastEntry); + } return lastTerm; } @@ -121,8 +120,11 @@ SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { SSyncLogStoreData* pData = pLogStore->data; SWal* pWal = pData->pWal; SyncIndex lastIndex = walGetLastVer(pWal); - SSyncRaftEntry* pEntry; - pEntry = logStoreGetEntry(pLogStore, lastIndex); + + SSyncRaftEntry* pEntry = NULL; + if (lastIndex > 0) { + pEntry = logStoreGetEntry(pLogStore, lastIndex); + } return pEntry; } @@ -143,7 +145,7 @@ cJSON* logStore2Json(SSyncLogStore* pLogStore) { cJSON* pEntries = cJSON_CreateArray(); cJSON_AddItemToObject(pRoot, "pEntries", pEntries); SyncIndex lastIndex = logStoreLastIndex(pLogStore); - for (SyncIndex i = 1; i <= lastIndex; ++i) { + for (SyncIndex i = 0; i <= lastIndex; ++i) { SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); syncEntryDestory(pEntry); @@ -164,6 +166,8 @@ char* logStore2Str(SSyncLogStore* pLogStore) { // for debug void logStorePrint(SSyncLogStore* pLogStore) { char* s = logStore2Str(pLogStore); - sTrace("%s", s); + // sTrace("%s", s); + fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s); + free(s); } \ No newline at end of file diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp index a859186bbe..a5eb748de6 100644 --- a/source/libs/sync/test/syncLogStoreTest.cpp +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -35,16 +35,19 @@ SSyncNode* syncNodeInit() { syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + int code = walInit(); + assert(code == 0); SWalCfg walCfg; memset(&walCfg, 0, sizeof(SWalCfg)); walCfg.vgId = syncInfo.vgId; walCfg.fsyncPeriod = 1000; walCfg.retentionPeriod = 1000; walCfg.rollPeriod = 1000; - walCfg.retentionSize = 100000; - walCfg.segSize = 100000; - walCfg.level = TAOS_WAL_WRITE; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; pWal = walOpen("./wal_test", &walCfg); + assert(pWal != NULL); syncInfo.pWal = pWal; @@ -80,8 +83,20 @@ SSyncNode* syncInitTest() { return syncNodeInit(); } void logStoreTest() { logStorePrint(pSyncNode->pLogStore); for (int i = 0; i < 5; ++i) { - SSyncRaftEntry* pEntry; + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100; + pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; + snprintf(pEntry->data, dataLen, "value%d", i); + + //syncEntryPrint2((char*)"write entry:", pEntry); pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); + syncEntryDestory(pEntry); } logStorePrint(pSyncNode->pLogStore); @@ -117,16 +132,10 @@ int main(int argc, char** argv) { pSyncNode = syncInitTest(); assert(pSyncNode != NULL); - syncNodePrint((char*)"syncLogStoreTest", pSyncNode); - - initRaftId(pSyncNode); - - //-------------------------------------------------------------- + //syncNodePrint((char*)"syncLogStoreTest", pSyncNode); + //initRaftId(pSyncNode); logStoreTest(); - //-------------------------------------------------------------- - // walClose(pWal); - return 0; }