diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index ccbeb00bfd..68ca9eff17 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -24,6 +24,7 @@ extern "C" { #include #include "taosdef.h" #include "trpc.h" +#include "wal.h" typedef uint64_t SyncNodeId; typedef int32_t SyncGroupId; @@ -87,25 +88,22 @@ typedef struct SSyncFSM { } SSyncFSM; +struct SSyncRaftEntry; +typedef struct SSyncRaftEntry SSyncRaftEntry; + // abstract definition of log store in raft // SWal implements it typedef struct SSyncLogStore { void* data; // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf); + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); - // get one log entry, user need to free pBuf->data - int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf); + // get one log entry, user need to free pEntry->pCont + SSyncRaftEntry* (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index); - // update log store commit index with "index" - int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // truncate log with index, entries after the given index (>index) will be deleted - int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex index); - - // return commit index of log - SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); + // truncate log with index, entries after the given index (>=index) will be deleted + int32_t (*truncate)(struct SSyncLogStore* pLogStore, SyncIndex fromIndex); // return index of last entry SyncIndex (*getLastIndex)(struct SSyncLogStore* pLogStore); @@ -113,6 +111,12 @@ typedef struct SSyncLogStore { // return term of last entry SyncTerm (*getLastTerm)(struct SSyncLogStore* pLogStore); + // update log store commit index with "index" + int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); + + // return commit index of log + SyncIndex (*getCommitIndex)(struct SSyncLogStore* pLogStore); + } SSyncLogStore; // raft need to persist two variables in storage: currentTerm, voteFor @@ -134,7 +138,7 @@ typedef struct SSyncInfo { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - char walPath[TSDB_FILENAME_LEN]; + SWal* pWal; SSyncFSM* pFsm; void* rpcClient; @@ -153,7 +157,7 @@ void syncCleanUp(); int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index 35d3046d66..5999ef8300 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 75b82aa531..c0c1f76707 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 8b77e292c4..2932240ec1 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -116,7 +116,8 @@ typedef struct SSyncNode { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; - char walPath[TSDB_FILENAME_LEN]; + char raftStorePath[TSDB_FILENAME_LEN * 2]; + SWal* pWal; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); void* queue; @@ -195,8 +196,6 @@ typedef struct SSyncNode { SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -cJSON* syncNode2Json(const SSyncNode* pSyncNode); -char* syncNode2Str(const SSyncNode* pSyncNode); int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); @@ -213,6 +212,11 @@ int32_t syncNodeRestartElectTimer(SSyncNode* pSyncNode, int32_t ms); int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); +// for debug +cJSON* syncNode2Json(const SSyncNode* pSyncNode); +char* syncNode2Str(const SSyncNode* pSyncNode); +void syncNodePrint(char* s, const SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3405f0f6cc..6231cb8399 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -24,8 +24,7 @@ extern "C" { #include #include #include "cJSON.h" -#include "sync.h" -#include "syncRaftEntry.h" +#include "syncInt.h" #include "taosdef.h" // encode as uint32 @@ -46,6 +45,7 @@ typedef enum ESyncMessageType { // --------------------------------------------- cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); cJSON* syncRpcUnknownMsg2Json(); +char* syncRpcMsg2Str(SRpcMsg* pRpcMsg); // --------------------------------------------- typedef enum ESyncTimeoutType { @@ -123,12 +123,22 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); typedef struct SyncClientRequest { uint32_t bytes; uint32_t msgType; - int64_t seqNum; + uint32_t originalRpcType; + uint64_t seqNum; bool isWeak; uint32_t dataLen; char data[]; } SyncClientRequest; +SyncClientRequest* syncClientRequestBuild(uint32_t dataLen); +void syncClientRequestDestroy(SyncClientRequest* pMsg); +void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen); +void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg); +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg); +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg); +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak); + // --------------------------------------------- typedef struct SyncClientRequestReply { uint32_t bytes; diff --git a/source/libs/sync/inc/syncOnMessage.h b/source/libs/sync/inc/syncOnMessage.h index 8eae4fed4d..7cb186a812 100644 --- a/source/libs/sync/inc/syncOnMessage.h +++ b/source/libs/sync/inc/syncOnMessage.h @@ -23,7 +23,6 @@ extern "C" { #include #include #include -#include "syncRaft.h" #include "taosdef.h" #ifdef __cplusplus diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h deleted file mode 100644 index bc5cf26a4c..0000000000 --- a/source/libs/sync/inc/syncRaft.h +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_LIBS_SYNC_RAFT_H -#define _TD_LIBS_SYNC_RAFT_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include -#include -#include -#include "sync.h" -#include "syncMessage.h" -#include "taosdef.h" - -#if 0 - -typedef struct SRaftId { - SyncNodeId addr; - SyncGroupId vgId; -} SRaftId; - -typedef struct SRaft { - SRaftId id; - SSyncFSM* pFsm; - - int32_t (*FpPing)(struct SRaft* ths, const RaftPing* pMsg); - - int32_t (*FpOnPing)(struct SRaft* ths, RaftPing* pMsg); - - int32_t (*FpOnPingReply)(struct SRaft* ths, RaftPingReply* pMsg); - - int32_t (*FpRequestVote)(struct SRaft* ths, const RaftRequestVote* pMsg); - - int32_t (*FpOnRequestVote)(struct SRaft* ths, RaftRequestVote* pMsg); - - int32_t (*FpOnRequestVoteReply)(struct SRaft* ths, RaftRequestVoteReply* pMsg); - - int32_t (*FpAppendEntries)(struct SRaft* ths, const RaftAppendEntries* pMsg); - - int32_t (*FpOnAppendEntries)(struct SRaft* ths, RaftAppendEntries* pMsg); - - int32_t (*FpOnAppendEntriesReply)(struct SRaft* ths, RaftAppendEntriesReply* pMsg); - -} SRaft; - -SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm); - -void raftClose(SRaft* pRaft); - -static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg); - -static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg); - -static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg); - -static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg); - -static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg); - -static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg); - -static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg); - -static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg); - -static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg); - -int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); - -static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); - -#endif - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_LIBS_SYNC_RAFT_H*/ diff --git a/source/libs/sync/inc/syncRaftEntry.h b/source/libs/sync/inc/syncRaftEntry.h index 516bef4d48..be25675db4 100644 --- a/source/libs/sync/inc/syncRaftEntry.h +++ b/source/libs/sync/inc/syncRaftEntry.h @@ -24,15 +24,31 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncMessage.h" #include "taosdef.h" typedef struct SSyncRaftEntry { - SyncTerm term; - SyncIndex index; - SSyncBuffer data; - int8_t flag; + uint32_t bytes; + uint32_t msgType; + uint32_t originalRpcType; + uint64_t seqNum; + bool isWeak; + SyncTerm term; + SyncIndex index; + uint32_t dataLen; + char data[]; } SSyncRaftEntry; +SSyncRaftEntry* syncEntryBuild(uint32_t dataLen); +SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); +void syncEntryDestory(SSyncRaftEntry* pEntry); +char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); +SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len); +cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry); +char* syncEntry2Str(const SSyncRaftEntry* pEntry); +void syncEntryPrint(const SSyncRaftEntry* pEntry); +void syncEntryPrint2(char *s, const SSyncRaftEntry* pEntry); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRaftLog.h b/source/libs/sync/inc/syncRaftLog.h index ee971062cf..d59b3206b5 100644 --- a/source/libs/sync/inc/syncRaftLog.h +++ b/source/libs/sync/inc/syncRaftLog.h @@ -24,27 +24,47 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncRaftEntry.h" #include "taosdef.h" -int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); +typedef struct SSyncLogStoreData { + SSyncNode* pSyncNode; + SWal* pWal; +} SSyncLogStoreData; -// get one log entry, user need to free pBuf->data -int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode); -// update log store commit index with "index" -int32_t raftLogUpdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index); +void logStoreDestory(SSyncLogStore* pLogStore); -// truncate log with index, entries after the given index (>index) will be deleted -int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index); +// append one log entry +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry); -// return commit index of log -SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore); +// get one log entry, user need to free pEntry->pCont +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index); + +// truncate log with index, entries after the given index (>=index) will be deleted +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex); // return index of last entry -SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore); +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore); // return term of last entry -SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore); +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore); + +// update log store commit index with "index" +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index); + +// return commit index of log +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore); + +SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore); + +cJSON* logStore2Json(SSyncLogStore* pLogStore); + +char* logStore2Str(SSyncLogStore* pLogStore); + +// for debug +void logStorePrint(SSyncLogStore* pLogStore); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 591a5b9963..1c25b799b4 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -25,7 +25,6 @@ extern "C" { #include #include "cJSON.h" #include "syncInt.h" -#include "syncRaft.h" #include "taosdef.h" #define RAFT_STORE_BLOCK_SIZE 512 diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 8bb4976de2..fd4ccd5371 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index ab9430b857..bcaf71a541 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 89fcb230fb..611f33a0f2 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -24,7 +24,6 @@ extern "C" { #include #include #include "syncInt.h" -#include "syncRaft.h" #include "taosdef.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot); diff --git a/source/libs/sync/inc/syncTimeout.h b/source/libs/sync/inc/syncTimeout.h index efd5aae48e..25c26c909d 100644 --- a/source/libs/sync/inc/syncTimeout.h +++ b/source/libs/sync/inc/syncTimeout.h @@ -25,7 +25,6 @@ extern "C" { #include #include "syncInt.h" #include "syncMessage.h" -#include "syncRaft.h" #include "taosdef.h" // TLA+ Spec diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index f2c979da99..2bbc1948dd 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -49,6 +49,9 @@ cJSON* syncUtilNodeInfo2Json(const SNodeInfo* p); cJSON* syncUtilRaftId2Json(const SRaftId* p); char* syncUtilRaftId2Str(const SRaftId* p); const char* syncUtilState2String(ESyncState state); +bool syncUtilCanPrint(char c); +char* syncUtilprintBin(char* ptr, uint32_t len); +char* syncUtilprintBin2(char* ptr, uint32_t len); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 7cb42c9f87..9d2afb03ca 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -15,7 +15,7 @@ #include "syncIO.h" #include -#include "syncOnMessage.h" +#include "syncMessage.h" #include "tglobal.h" #include "ttimer.h" #include "tutil.h" @@ -220,12 +220,17 @@ static void *syncIOConsumerFunc(void *param) { while (1) { int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); - if (numOfMsgs <= 0) break; + if (numOfMsgs <= 0) { + break; + } for (int i = 0; i < numOfMsgs; ++i) { taosGetQitem(qall, (void **)&pRpcMsg); + + char *s = syncRpcMsg2Str(pRpcMsg); sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen, - (char *)(pRpcMsg->pCont)); + s); + free(s); if (pRpcMsg->msgType == SYNC_PING) { if (io->FpOnSyncPing != NULL) { @@ -247,7 +252,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { - if (io->FpOnSyncRequestVote) { + if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg; pSyncMsg = syncRequestVoteBuild(pRpcMsg->contLen); syncRequestVoteFromRpcMsg(pRpcMsg, pSyncMsg); @@ -256,7 +261,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { - if (io->FpOnSyncRequestVoteReply) { + if (io->FpOnSyncRequestVoteReply != NULL) { SyncRequestVoteReply *pSyncMsg; pSyncMsg = SyncRequestVoteReplyBuild(); syncRequestVoteReplyFromRpcMsg(pRpcMsg, pSyncMsg); @@ -265,7 +270,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { - if (io->FpOnSyncAppendEntries) { + if (io->FpOnSyncAppendEntries != NULL) { SyncAppendEntries *pSyncMsg; pSyncMsg = syncAppendEntriesBuild(pRpcMsg->contLen); syncAppendEntriesFromRpcMsg(pRpcMsg, pSyncMsg); @@ -274,7 +279,7 @@ static void *syncIOConsumerFunc(void *param) { } } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { - if (io->FpOnSyncAppendEntriesReply) { + if (io->FpOnSyncAppendEntriesReply != NULL) { SyncAppendEntriesReply *pSyncMsg; pSyncMsg = syncAppendEntriesReplyBuild(); syncAppendEntriesReplyFromRpcMsg(pRpcMsg, pSyncMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 3b8d716dbe..5cd7149b72 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -18,8 +18,9 @@ #include "syncAppendEntries.h" #include "syncAppendEntriesReply.h" #include "syncEnv.h" +#include "syncIndexMgr.h" #include "syncInt.h" -#include "syncRaft.h" +#include "syncRaftLog.h" #include "syncRaftStore.h" #include "syncRequestVote.h" #include "syncRequestVoteReply.h" @@ -59,13 +60,32 @@ int64_t syncStart(const SSyncInfo* pSyncInfo) { return 0; } -void syncStop(int64_t rid) {} +void syncStop(int64_t rid) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + syncNodeClose(pSyncNode); +} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pMsg, bool isWeak) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) { + SyncClientRequest* pSyncMsg = syncClientRequestBuild2(pMsg, 0, isWeak); + SRpcMsg rpcMsg; + syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncClientRequestDestroy(pSyncMsg); + } else { + sTrace("syncForwardToPeer not leader, %s", syncUtilState2String(pSyncNode->state)); + return -1; // need define err code !! + } + return 0; +} -ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } +ESyncState syncGetMyRole(int64_t rid) { + SSyncNode* pSyncNode = NULL; // get pointer from rid + return pSyncNode->state; +} void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} @@ -78,7 +98,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->vgId = pSyncInfo->vgId; pSyncNode->syncCfg = pSyncInfo->syncCfg; memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); - memcpy(pSyncNode->walPath, pSyncInfo->walPath, sizeof(pSyncNode->walPath)); + snprintf(pSyncNode->raftStorePath, sizeof(pSyncNode->raftStorePath), "%s/raft_store.json", pSyncInfo->path); + pSyncNode->pWal = pSyncInfo->pWal; pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; pSyncNode->queue = pSyncInfo->queue; @@ -114,20 +135,27 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init life cycle - // init server vars + // init TLA+ server vars pSyncNode->state = TAOS_SYNC_STATE_FOLLOWER; - pSyncNode->pRaftStore = raftStoreOpen(pSyncInfo->walPath); + pSyncNode->pRaftStore = raftStoreOpen(pSyncNode->raftStorePath); assert(pSyncNode->pRaftStore != NULL); - // init candidate vars + // init TLA+ candidate vars pSyncNode->pVotesGranted = voteGrantedCreate(pSyncNode); assert(pSyncNode->pVotesGranted != NULL); pSyncNode->pVotesRespond = votesRespondCreate(pSyncNode); assert(pSyncNode->pVotesRespond != NULL); - // init leader vars - pSyncNode->pNextIndex = NULL; - pSyncNode->pMatchIndex = NULL; + // init TLA+ leader vars + pSyncNode->pNextIndex = syncIndexMgrCreate(pSyncNode); + assert(pSyncNode->pNextIndex != NULL); + pSyncNode->pMatchIndex = syncIndexMgrCreate(pSyncNode); + assert(pSyncNode->pMatchIndex != NULL); + + // init TLA+ log vars + pSyncNode->pLogStore = logStoreCreate(pSyncNode); + assert(pSyncNode->pLogStore != NULL); + pSyncNode->commitIndex = 0; // init ping timer pSyncNode->pPingTimer = NULL; @@ -177,7 +205,8 @@ cJSON* syncNode2Json(const SSyncNode* pSyncNode) { // init by SSyncInfo cJSON_AddNumberToObject(pRoot, "vgId", pSyncNode->vgId); cJSON_AddStringToObject(pRoot, "path", pSyncNode->path); - cJSON_AddStringToObject(pRoot, "walPath", pSyncNode->walPath); + snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); snprintf(u64buf, sizeof(u64buf), "%p", pSyncNode->rpcClient); cJSON_AddStringToObject(pRoot, "rpcClient", u64buf); @@ -298,6 +327,13 @@ char* syncNode2Str(const SSyncNode* pSyncNode) { return serialized; } +void syncNodePrint(char* s, const SSyncNode* pSyncNode) { + char* ss = syncNode2Str(pSyncNode); + // sTrace("syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); + fprintf(stderr, "syncNodePrint: %s [len:%lu]| %s", s, strlen(ss), ss); + free(ss); +} + int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); @@ -472,6 +508,8 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { } static void syncNodeEqPingTimer(void* param, void* tmrId) { + sTrace("<-- syncNodeEqPingTimer -->"); + SSyncNode* pSyncNode = (SSyncNode*)param; if (atomic_load_64(&pSyncNode->pingTimerLogicClockUser) <= atomic_load_64(&pSyncNode->pingTimerLogicClock)) { SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, atomic_load_64(&pSyncNode->pingTimerLogicClock), @@ -484,7 +522,7 @@ static void syncNodeEqPingTimer(void* param, void* tmrId) { // reset timer ms // pSyncNode->pingTimerMS += 100; - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { sTrace("syncNodeEqPingTimer: pingTimerLogicClock:%lu, pingTimerLogicClockUser:%lu", pSyncNode->pingTimerLogicClock, @@ -506,7 +544,7 @@ static void syncNodeEqElectTimer(void* param, void* tmrId) { // reset timer ms pSyncNode->electTimerMS = syncUtilElectRandomMS(); - taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { sTrace("syncNodeEqElectTimer: electTimerLogicClock:%lu, electTimerLogicClockUser:%lu", @@ -530,7 +568,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) { // reset timer ms // pSyncNode->heartbeatTimerMS += 100; - taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pHeartbeatTimer); } else { sTrace("syncNodeEqHeartbeatTimer: heartbeatTimerLogicClock:%lu, heartbeatTimerLogicClockUser:%lu", diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 14f139a803..baef49d748 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -14,7 +14,6 @@ */ #include "syncMessage.h" -#include "syncRaft.h" #include "syncUtil.h" #include "tcoding.h" @@ -36,7 +35,8 @@ cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { pRoot = syncPingReply2Json(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { - pRoot = syncRpcUnknownMsg2Json(); + SyncClientRequest* pSyncMsg = (SyncClientRequest*)pRpcMsg->pCont; + pRoot = syncClientRequest2Json(pSyncMsg); } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { pRoot = syncRpcUnknownMsg2Json(); @@ -76,6 +76,13 @@ cJSON* syncRpcUnknownMsg2Json() { return pJson; } +char* syncRpcMsg2Str(SRpcMsg* pRpcMsg) { + cJSON* pJson = syncRpcMsg2Json(pRpcMsg); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + // ---- message process SyncTimeout---- SyncTimeout* syncTimeoutBuild() { uint32_t bytes = sizeof(SyncTimeout); @@ -149,6 +156,7 @@ SyncPing* syncPingBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_PING; pMsg->dataLen = dataLen; + return pMsg; } void syncPingDestroy(SyncPing* pMsg) { @@ -247,6 +255,7 @@ SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_PING_REPLY; pMsg->dataLen = dataLen; + return pMsg; } void syncPingReplyDestroy(SyncPingReply* pMsg) { @@ -337,6 +346,73 @@ SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) return pMsg; } +// ---- message process SyncClientRequest---- +SyncClientRequest* syncClientRequestBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SyncClientRequest) + dataLen; + SyncClientRequest* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_CLIENT_REQUEST; + pMsg->seqNum = 0; + pMsg->isWeak = false; + pMsg->dataLen = dataLen; + return pMsg; +} + +void syncClientRequestDestroy(SyncClientRequest* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncClientRequestSerialize(const SyncClientRequest* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncClientRequestDeserialize(const char* buf, uint32_t len, SyncClientRequest* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncClientRequestSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg) { + syncClientRequestDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pMsg->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pMsg->isWeak); + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncClientRequest", pRoot); + return pJson; +} + +SyncClientRequest* syncClientRequestBuild2(const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum, bool isWeak) { + SyncClientRequest* pMsg = syncClientRequestBuild(pOriginalRpcMsg->contLen); + pMsg->originalRpcType = pOriginalRpcMsg->msgType; + pMsg->seqNum = seqNum; + pMsg->isWeak = isWeak; + memcpy(pMsg->data, pOriginalRpcMsg->pCont, pOriginalRpcMsg->contLen); + return pMsg; +} + // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild() { uint32_t bytes = sizeof(SyncRequestVote); @@ -344,6 +420,7 @@ SyncRequestVote* syncRequestVoteBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_REQUEST_VOTE; + return pMsg; } void syncRequestVoteDestroy(SyncRequestVote* pMsg) { @@ -429,6 +506,7 @@ SyncRequestVoteReply* SyncRequestVoteReplyBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_REQUEST_VOTE_REPLY; + return pMsg; } void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg) { @@ -512,6 +590,7 @@ SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen) { pMsg->bytes = bytes; pMsg->msgType = SYNC_APPEND_ENTRIES; pMsg->dataLen = dataLen; + return pMsg; } void syncAppendEntriesDestroy(SyncAppendEntries* pMsg) { @@ -604,6 +683,7 @@ SyncAppendEntriesReply* syncAppendEntriesReplyBuild() { memset(pMsg, 0, bytes); pMsg->bytes = bytes; pMsg->msgType = SYNC_APPEND_ENTRIES_REPLY; + return pMsg; } void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg) { diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c deleted file mode 100644 index b07c6ea797..0000000000 --- a/source/libs/sync/src/syncRaft.c +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "syncRaft.h" -#include "sync.h" - -#if 0 - -SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { - SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); - assert(pRaft != NULL); - - pRaft->id = raftId; - pRaft->pFsm = pFsm; - - pRaft->FpPing = doRaftPing; - pRaft->FpOnPing = onRaftPing; - pRaft->FpOnPingReply = onRaftPingReply; - - pRaft->FpRequestVote = doRaftRequestVote; - pRaft->FpOnRequestVote = onRaftRequestVote; - pRaft->FpOnRequestVoteReply = onRaftRequestVoteReply; - - pRaft->FpAppendEntries = doRaftAppendEntries; - pRaft->FpOnAppendEntries = onRaftAppendEntries; - pRaft->FpOnAppendEntriesReply = onRaftAppendEntriesReply; - - return pRaft; -} - -void raftClose(SRaft* pRaft) { - assert(pRaft != NULL); - free(pRaft); -} - -static int32_t doRaftPing(struct SRaft* ths, const RaftPing* pMsg) { return 0; } - -static int32_t onRaftPing(struct SRaft* ths, RaftPing* pMsg) { return 0; } - -static int32_t onRaftPingReply(struct SRaft* ths, RaftPingReply* pMsg) { return 0; } - -static int32_t doRaftRequestVote(struct SRaft* ths, const RaftRequestVote* pMsg) { return 0; } - -static int32_t onRaftRequestVote(struct SRaft* ths, RaftRequestVote* pMsg) { return 0; } - -static int32_t onRaftRequestVoteReply(struct SRaft* ths, RaftRequestVoteReply* pMsg) { return 0; } - -static int32_t doRaftAppendEntries(struct SRaft* ths, const RaftAppendEntries* pMsg) { return 0; } - -static int32_t onRaftAppendEntries(struct SRaft* ths, RaftAppendEntries* pMsg) { return 0; } - -static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesReply* pMsg) { return 0; } - -int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } - -static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } - -#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index e525d3c7c2..959bf49ee7 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -14,3 +14,104 @@ */ #include "syncRaftEntry.h" +#include "syncUtil.h" + +SSyncRaftEntry* syncEntryBuild(uint32_t dataLen) { + uint32_t bytes = sizeof(SSyncRaftEntry) + dataLen; + SSyncRaftEntry* pEntry = malloc(bytes); + assert(pEntry != NULL); + memset(pEntry, 0, bytes); + pEntry->bytes = bytes; + pEntry->dataLen = dataLen; + return pEntry; +} + +SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index) { + SSyncRaftEntry* pEntry = syncEntryBuild(pMsg->dataLen); + assert(pEntry != NULL); + + pEntry->msgType = pMsg->msgType; + pEntry->originalRpcType = pMsg->originalRpcType; + pEntry->seqNum = pMsg->seqNum; + pEntry->isWeak = pMsg->isWeak; + pEntry->term = term; + pEntry->index = index; + pEntry->dataLen = pMsg->dataLen; + memcpy(pEntry->data, pMsg->data, pMsg->dataLen); + + return pEntry; +} + +void syncEntryDestory(SSyncRaftEntry* pEntry) { + if (pEntry != NULL) { + free(pEntry); + } +} + +char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len) { + char* buf = malloc(pEntry->bytes); + assert(buf != NULL); + memcpy(buf, pEntry, pEntry->bytes); + if (len != NULL) { + *len = pEntry->bytes; + } + return buf; +} + +SSyncRaftEntry* syncEntryDeserialize(const char* buf, uint32_t len) { + uint32_t bytes = *((uint32_t*)buf); + SSyncRaftEntry* pEntry = malloc(bytes); + assert(pEntry != NULL); + memcpy(pEntry, buf, len); + assert(len == pEntry->bytes); + return pEntry; +} + +cJSON* syncEntry2Json(const SSyncRaftEntry* pEntry) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pEntry->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pEntry->msgType); + cJSON_AddNumberToObject(pRoot, "originalRpcType", pEntry->originalRpcType); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->seqNum); + cJSON_AddStringToObject(pRoot, "seqNum", u64buf); + cJSON_AddNumberToObject(pRoot, "isWeak", pEntry->isWeak); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->term); + cJSON_AddStringToObject(pRoot, "term", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", pEntry->index); + cJSON_AddStringToObject(pRoot, "index", u64buf); + cJSON_AddNumberToObject(pRoot, "dataLen", pEntry->dataLen); + + char* s; + s = syncUtilprintBin((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data", s); + free(s); + + s = syncUtilprintBin2((char*)(pEntry->data), pEntry->dataLen); + cJSON_AddStringToObject(pRoot, "data2", s); + free(s); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncRaftEntry", pRoot); + return pJson; +} + +char* syncEntry2Str(const SSyncRaftEntry* pEntry) { + cJSON* pJson = syncEntry2Json(pEntry); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +void syncEntryPrint(const SSyncRaftEntry* pEntry) { + char* s = syncEntry2Str(pEntry); + sTrace("%s", s); + free(s); +} + +void syncEntryPrint2(char* s, const SSyncRaftEntry* pEntry) { + char* ss = syncEntry2Str(pEntry); + sTrace("%s | %s", s, ss); + free(ss); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftLog.c b/source/libs/sync/src/syncRaftLog.c index e467057c8f..d177d3ac9b 100644 --- a/source/libs/sync/src/syncRaftLog.c +++ b/source/libs/sync/src/syncRaftLog.c @@ -14,46 +14,160 @@ */ #include "syncRaftLog.h" +#include "wal.h" -int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf) { return 0; } +SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) { + SSyncLogStore* pLogStore = malloc(sizeof(SSyncLogStore)); + assert(pLogStore != NULL); -// get one log entry, user need to free pBuf->data -int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf) { return 0; } + pLogStore->data = malloc(sizeof(SSyncLogStoreData)); + assert(pLogStore->data != NULL); -// TLA+ Spec -// \* Leader i advances its commitIndex. -// \* This is done as a separate step from handling AppendEntries responses, -// \* in part to minimize atomic regions, and in part so that leaders of -// \* single-server clusters are able to mark entries committed. -// AdvanceCommitIndex(i) == -// /\ state[i] = Leader -// /\ LET \* The set of servers that agree up through index. -// Agree(index) == {i} \cup {k \in Server : -// matchIndex[i][k] >= index} -// \* The maximum indexes for which a quorum agrees -// agreeIndexes == {index \in 1..Len(log[i]) : -// Agree(index) \in Quorum} -// \* New value for commitIndex'[i] -// newCommitIndex == -// IF /\ agreeIndexes /= {} -// /\ log[i][Max(agreeIndexes)].term = currentTerm[i] -// THEN -// Max(agreeIndexes) -// ELSE -// commitIndex[i] -// IN commitIndex' = [commitIndex EXCEPT ![i] = newCommitIndex] -// /\ UNCHANGED <> -// -int32_t raftLogupdateCommitIndex(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } + SSyncLogStoreData* pData = pLogStore->data; + pData->pSyncNode = pSyncNode; + pData->pWal = pSyncNode->pWal; -// truncate log with index, entries after the given index (>index) will be deleted -int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex index) { return 0; } + pLogStore->appendEntry = logStoreAppendEntry; + pLogStore->getEntry = logStoreGetEntry; + pLogStore->truncate = logStoreTruncate; + pLogStore->getLastIndex = logStoreLastIndex; + pLogStore->getLastTerm = logStoreLastTerm; + pLogStore->updateCommitIndex = logStoreUpdateCommitIndex; + pLogStore->getCommitIndex = logStoreGetCommitIndex; +} -// return commit index of log -SyncIndex raftLogGetCommitIndex(struct SSyncLogStore* pLogStore) { return 0; } +void logStoreDestory(SSyncLogStore* pLogStore) { + if (pLogStore != NULL) { + free(pLogStore->data); + free(pLogStore); + } +} + +// append one log entry +int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + + assert(pEntry->index == logStoreLastIndex(pLogStore) + 1); + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(serialized != NULL); + + int code; + code = walWrite(pWal, pEntry->index, pEntry->msgType, serialized, len); + assert(code == 0); + + walFsync(pWal, true); + free(serialized); +} + +// get one log entry, user need to free pEntry->pCont +SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SSyncRaftEntry* pEntry; + + SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); + walReadWithHandle(pWalHandle, index); + pEntry = syncEntryDeserialize(pWalHandle->pHead->head.body, pWalHandle->pHead->head.len); + assert(pEntry != NULL); + + // need to hold, do not new every time!! + walCloseReadHandle(pWalHandle); + return pEntry; +} + +// truncate log with index, entries after the given index (>=index) will be deleted +int32_t logStoreTruncate(SSyncLogStore* pLogStore, SyncIndex fromIndex) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + walRollback(pWal, fromIndex); +} // return index of last entry -SyncIndex raftLogGetLastIndex(struct SSyncLogStore* pLogStore) { return 0; } +SyncIndex logStoreLastIndex(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SyncIndex lastIndex = walGetLastVer(pWal); + return lastIndex; +} // return term of last entry -SyncTerm raftLogGetLastTerm(struct SSyncLogStore* pLogStore) { return 0; } \ No newline at end of file +SyncTerm logStoreLastTerm(SSyncLogStore* pLogStore) { + SyncTerm lastTerm = 0; + SSyncRaftEntry* pLastEntry = logStoreGetLastEntry(pLogStore); + if (pLastEntry != NULL) { + lastTerm = pLastEntry->term; + free(pLastEntry); + } + return lastTerm; +} + +// update log store commit index with "index" +int32_t logStoreUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + walCommit(pWal, index); +} + +// return commit index of log +SyncIndex logStoreGetCommitIndex(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + return pData->pSyncNode->commitIndex; +} + +SSyncRaftEntry* logStoreGetLastEntry(SSyncLogStore* pLogStore) { + SSyncLogStoreData* pData = pLogStore->data; + SWal* pWal = pData->pWal; + SyncIndex lastIndex = walGetLastVer(pWal); + + SSyncRaftEntry* pEntry = NULL; + if (lastIndex > 0) { + pEntry = logStoreGetEntry(pLogStore, lastIndex); + } + return pEntry; +} + +cJSON* logStore2Json(SSyncLogStore* pLogStore) { + char u64buf[128]; + + SSyncLogStoreData* pData = (SSyncLogStoreData*)pLogStore->data; + cJSON* pRoot = cJSON_CreateObject(); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pSyncNode); + cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf); + snprintf(u64buf, sizeof(u64buf), "%p", pData->pWal); + cJSON_AddStringToObject(pRoot, "pWal", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastIndex(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastIndex", u64buf); + snprintf(u64buf, sizeof(u64buf), "%lu", logStoreLastTerm(pLogStore)); + cJSON_AddStringToObject(pRoot, "LastTerm", u64buf); + + cJSON* pEntries = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "pEntries", pEntries); + SyncIndex lastIndex = logStoreLastIndex(pLogStore); + for (SyncIndex i = 0; i <= lastIndex; ++i) { + SSyncRaftEntry* pEntry = logStoreGetEntry(pLogStore, i); + cJSON_AddItemToArray(pEntries, syncEntry2Json(pEntry)); + syncEntryDestory(pEntry); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SSyncLogStore", pRoot); + return pJson; +} + +char* logStore2Str(SSyncLogStore* pLogStore) { + cJSON* pJson = logStore2Json(pLogStore); + char* serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +// for debug +void logStorePrint(SSyncLogStore* pLogStore) { + char* s = logStore2Str(pLogStore); + // sTrace("%s", s); + fprintf(stderr, "logStorePrint: [len:%lu]| %s \n", strlen(s), s); + + free(s); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index da194780ff..42b2bd993b 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -14,7 +14,6 @@ */ #include "syncSnapshot.h" -#include "syncRaft.h" int32_t takeSnapshot(SSyncFSM *pFsm, SSnapshot *pSnapshot) { return 0; } diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index 216dccd62c..b78971bf37 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -148,4 +148,40 @@ const char* syncUtilState2String(ESyncState state) { } else { return "TAOS_SYNC_STATE_UNKNOWN"; } +} + +bool syncUtilCanPrint(char c) { + if (c >= 32 && c <= 126) { + return true; + } else { + return false; + } +} + +char* syncUtilprintBin(char* ptr, uint32_t len) { + char* s = malloc(len + 1); + assert(s != NULL); + memset(s, 0, len + 1); + memcpy(s, ptr, len); + + for (int i = 0; i < len; ++i) { + if (!syncUtilCanPrint(s[i])) { + s[i] = '.'; + } + } + return s; +} + +char* syncUtilprintBin2(char* ptr, uint32_t len) { + uint32_t len2 = len * 4 + 1; + char* s = malloc(len2); + assert(s != NULL); + memset(s, 0, len2); + + char* p = s; + for (int i = 0; i < len; ++i) { + int n = sprintf(p, "%d,", ptr[i]); + p += n; + } + return s; } \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 5a5186c7e2..bfde08ffac 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -15,6 +15,8 @@ add_executable(syncUtilTest "") add_executable(syncVotesGrantedTest "") add_executable(syncVotesRespondTest "") add_executable(syncIndexMgrTest "") +add_executable(syncLogStoreTest "") +add_executable(syncEntryTest "") target_sources(syncTest @@ -85,6 +87,14 @@ target_sources(syncIndexMgrTest PRIVATE "syncIndexMgrTest.cpp" ) +target_sources(syncLogStoreTest + PRIVATE + "syncLogStoreTest.cpp" +) +target_sources(syncEntryTest + PRIVATE + "syncEntryTest.cpp" +) target_include_directories(syncTest @@ -172,6 +182,16 @@ target_include_directories(syncIndexMgrTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncLogStoreTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncEntryTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -242,6 +262,14 @@ target_link_libraries(syncIndexMgrTest sync gtest_main ) +target_link_libraries(syncLogStoreTest + sync + gtest_main +) +target_link_libraries(syncEntryTest + sync + gtest_main +) enable_testing() @@ -249,3 +277,5 @@ add_test( NAME sync_test COMMAND syncTest ) + + diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp index e2bc9a73ae..e1706bb40b 100644 --- a/source/libs/sync/test/syncEnqTest.cpp +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -2,6 +2,7 @@ #include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" +#include "syncMessage.h" #include "syncRaftStore.h" void logTest() { diff --git a/source/libs/sync/test/syncEntryTest.cpp b/source/libs/sync/test/syncEntryTest.cpp new file mode 100644 index 0000000000..e54daaa79a --- /dev/null +++ b/source/libs/sync/test/syncEntryTest.cpp @@ -0,0 +1,81 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +void test1() { + SSyncRaftEntry* pEntry = syncEntryBuild(10); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100; + pEntry->index = 200; + strcpy(pEntry->data, "test1"); + + syncEntryPrint(pEntry); + syncEntryDestory(pEntry); +} + +void test2() { + SyncClientRequest* pSyncMsg = syncClientRequestBuild(10); + pSyncMsg->originalRpcType = 33; + pSyncMsg->seqNum = 11; + pSyncMsg->isWeak = 1; + strcpy(pSyncMsg->data, "test2"); + + SSyncRaftEntry* pEntry = syncEntryBuild2(pSyncMsg, 100, 200); + syncEntryPrint(pEntry); + + syncClientRequestDestroy(pSyncMsg); + syncEntryDestory(pEntry); +} + +void test3() { + SSyncRaftEntry* pEntry = syncEntryBuild(10); + assert(pEntry != NULL); + pEntry->msgType = 11; + pEntry->originalRpcType = 22; + pEntry->seqNum = 33; + pEntry->isWeak = true; + pEntry->term = 44; + pEntry->index = 55; + strcpy(pEntry->data, "test3"); + syncEntryPrint(pEntry); + + uint32_t len; + char* serialized = syncEntrySerialize(pEntry, &len); + assert(serialized != NULL); + SSyncRaftEntry* pEntry2 = syncEntryDeserialize(serialized, len); + syncEntryPrint(pEntry2); + + free(serialized); + syncEntryDestory(pEntry2); + syncEntryDestory(pEntry); +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + test1(); + test2(); + test3(); + + return 0; +} diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 1ac4357c5a..101c0efe9a 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -3,6 +3,7 @@ #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" +#include "ttime.h" void logTest() { sTrace("--- sync log test: trace"); @@ -13,24 +14,13 @@ void logTest() { sFatal("--- sync log test: fatal"); } -void doSync() { - SSyncInfo syncInfo; - syncInfo.vgId = 1; +void *pTimer = NULL; +void *pTimerMgr = NULL; +int g = 300; - SSyncCfg* pCfg = &syncInfo.syncCfg; - pCfg->replicaNum = 3; - - pCfg->nodeInfo[0].nodePort = 7010; - taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - - pCfg->nodeInfo[1].nodePort = 7110; - taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - - pCfg->nodeInfo[2].nodePort = 7210; - taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); - - SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); - assert(pSyncNode != NULL); +static void timerFp(void *param, void *tmrId) { + printf("param:%p, tmrId:%p, pTimer:%p, pTimerMgr:%p \n", param, tmrId, pTimer, pTimerMgr); + taosTmrReset(timerFp, 1000, param, pTimerMgr, &pTimer); } int main() { @@ -41,13 +31,12 @@ int main() { logTest(); - // ret = syncIOStart(); - // assert(ret == 0); - ret = syncEnvStart(); assert(ret == 0); - // doSync(); + // timer + pTimerMgr = taosTmrInit(1000, 50, 10000, "SYNC-ENV-TEST"); + taosTmrStart(timerFp, 1000, &g, pTimerMgr); while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncIndexMgrTest.cpp b/source/libs/sync/test/syncIndexMgrTest.cpp index 4e4cd9222b..6bad8f09cf 100644 --- a/source/libs/sync/test/syncIndexMgrTest.cpp +++ b/source/libs/sync/test/syncIndexMgrTest.cpp @@ -33,8 +33,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncInitTest.cpp b/source/libs/sync/test/syncInitTest.cpp index 669c4e68a5..b6794544eb 100644 --- a/source/libs/sync/test/syncInitTest.cpp +++ b/source/libs/sync/test/syncInitTest.cpp @@ -30,8 +30,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; @@ -54,6 +53,7 @@ SSyncNode* syncNodeInit() { gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; @@ -89,11 +89,11 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = syncInitTest(); assert(pSyncNode != NULL); - char* serialized = syncNode2Str(pSyncNode); - printf("%s\n", serialized); - free(serialized); + syncNodePrint((char*)"syncInitTest", pSyncNode); initRaftId(pSyncNode); + //-------------------------------------------------------------- + return 0; -} +} \ No newline at end of file diff --git a/source/libs/sync/test/syncLogStoreTest.cpp b/source/libs/sync/test/syncLogStoreTest.cpp new file mode 100644 index 0000000000..a5eb748de6 --- /dev/null +++ b/source/libs/sync/test/syncLogStoreTest.cpp @@ -0,0 +1,141 @@ +#include +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftLog.h" +#include "syncRaftStore.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 1; +int32_t myIndex = 0; + +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; +SWal* pWal; +SSyncNode* pSyncNode; + +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); + + int code = walInit(); + assert(code == 0); + SWalCfg walCfg; + memset(&walCfg, 0, sizeof(SWalCfg)); + walCfg.vgId = syncInfo.vgId; + walCfg.fsyncPeriod = 1000; + walCfg.retentionPeriod = 1000; + walCfg.rollPeriod = 1000; + walCfg.retentionSize = 1000; + walCfg.segSize = 1000; + walCfg.level = TAOS_WAL_FSYNC; + pWal = walOpen("./wal_test", &walCfg); + assert(pWal != NULL); + + syncInfo.pWal = pWal; + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = replicaNum; + + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } + + pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void logStoreTest() { + logStorePrint(pSyncNode->pLogStore); + for (int i = 0; i < 5; ++i) { + int32_t dataLen = 10; + SSyncRaftEntry* pEntry = syncEntryBuild(dataLen); + assert(pEntry != NULL); + pEntry->msgType = 1; + pEntry->originalRpcType = 2; + pEntry->seqNum = 3; + pEntry->isWeak = true; + pEntry->term = 100; + pEntry->index = pSyncNode->pLogStore->getLastIndex(pSyncNode->pLogStore) + 1; + snprintf(pEntry->data, dataLen, "value%d", i); + + //syncEntryPrint2((char*)"write entry:", pEntry); + pSyncNode->pLogStore->appendEntry(pSyncNode->pLogStore, pEntry); + syncEntryDestory(pEntry); + } + logStorePrint(pSyncNode->pLogStore); + + pSyncNode->pLogStore->truncate(pSyncNode->pLogStore, 3); + logStorePrint(pSyncNode->pLogStore); +} + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } +} + +int main(int argc, char** argv) { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + + //syncNodePrint((char*)"syncLogStoreTest", pSyncNode); + //initRaftId(pSyncNode); + + logStoreTest(); + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 450e097cc8..83f1f67eb1 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -1,8 +1,10 @@ +#include #include #include "syncEnv.h" #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" +#include "syncUtil.h" void logTest() { sTrace("--- sync log test: trace"); @@ -13,59 +15,65 @@ void logTest() { sFatal("--- sync log test: fatal"); } -uint16_t ports[3] = {7010, 7110, 7210}; +uint16_t ports[] = {7010, 7110, 7210, 7310, 7410}; +int32_t replicaNum = 3; +int32_t myIndex = 0; -SSyncNode* doSync(int myIndex) { - SSyncFSM* pFsm; +SRaftId ids[TSDB_MAX_REPLICA]; +SSyncInfo syncInfo; +SSyncFSM* pFsm; - SSyncInfo syncInfo; - syncInfo.vgId = 1; +SSyncNode* syncNodeInit() { + syncInfo.vgId = 1234; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./wal_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; - pCfg->replicaNum = 3; + pCfg->replicaNum = replicaNum; - pCfg->nodeInfo[0].nodePort = ports[0]; - snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); - - pCfg->nodeInfo[1].nodePort = ports[1]; - snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); - - pCfg->nodeInfo[2].nodePort = ports[2]; - snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); - // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + for (int i = 0; i < replicaNum; ++i) { + pCfg->nodeInfo[i].nodePort = ports[i]; + snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + } SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote; + gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply; + gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries; + gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; gSyncIO->pSyncNode = pSyncNode; return pSyncNode; } -void timerPingAll(void* param, void* tmrId) { - SSyncNode* pSyncNode = (SSyncNode*)param; - syncNodePingAll(pSyncNode); +SSyncNode* syncInitTest() { return syncNodeInit(); } + +void initRaftId(SSyncNode* pSyncNode) { + for (int i = 0; i < replicaNum; ++i) { + ids[i] = pSyncNode->replicasId[i]; + char* s = syncUtilRaftId2Str(&ids[i]); + printf("raftId[%d] : %s\n", i, s); + free(s); + } } int main(int argc, char** argv) { - // taosInitLog((char*)"syncPingTest.log", 100000, 10); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; - logTest(); - - int myIndex = 0; + myIndex = 0; if (argc >= 2) { myIndex = atoi(argv[1]); } @@ -76,30 +84,45 @@ int main(int argc, char** argv) { ret = syncEnvStart(); assert(ret == 0); - SSyncNode* pSyncNode = doSync(myIndex); - gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; - gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; - gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; + SSyncNode* pSyncNode = syncInitTest(); + assert(pSyncNode != NULL); + syncNodePrint((char*)"----1", pSyncNode); + initRaftId(pSyncNode); + + //--------------------------- + + sTrace("syncNodeStartPingTimer ..."); ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----2", pSyncNode); + sTrace("sleep ..."); taosMsleep(10000); + sTrace("syncNodeStopPingTimer ..."); ret = syncNodeStopPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----3", pSyncNode); - taosMsleep(10000); + sTrace("sleep ..."); + taosMsleep(5000); + sTrace("syncNodeStartPingTimer ..."); ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----4", pSyncNode); + sTrace("sleep ..."); taosMsleep(10000); + sTrace("syncNodeStopPingTimer ..."); ret = syncNodeStopPingTimer(pSyncNode); assert(ret == 0); + syncNodePrint((char*)"----5", pSyncNode); while (1) { + sTrace("while 1 sleep ..."); taosMsleep(1000); } diff --git a/source/libs/sync/test/syncVotesGrantedTest.cpp b/source/libs/sync/test/syncVotesGrantedTest.cpp index 3edde509f8..504fa3034a 100644 --- a/source/libs/sync/test/syncVotesGrantedTest.cpp +++ b/source/libs/sync/test/syncVotesGrantedTest.cpp @@ -32,8 +32,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex; diff --git a/source/libs/sync/test/syncVotesRespondTest.cpp b/source/libs/sync/test/syncVotesRespondTest.cpp index 74d42cd531..0b6abef212 100644 --- a/source/libs/sync/test/syncVotesRespondTest.cpp +++ b/source/libs/sync/test/syncVotesRespondTest.cpp @@ -32,8 +32,7 @@ SSyncNode* syncNodeInit() { syncInfo.queue = gSyncIO->pMsgQ; syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; - snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_path"); - snprintf(syncInfo.walPath, sizeof(syncInfo.walPath), "%s", "./test_wal_path"); + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = myIndex;