diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index a619e66622..cd04783dbc 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -34,7 +34,9 @@ typedef enum { TAOS_SYNC_STATE_FOLLOWER = 0, TAOS_SYNC_STATE_CANDIDATE = 1, TAOS_SYNC_STATE_LEADER = 2, -} ESyncState; +} ESyncRole; + +typedef ESyncRole ESyncState; typedef struct SSyncBuffer { void* data; @@ -69,15 +71,15 @@ typedef struct SSyncFSM { // when value in pBuf finish a raft flow, FpCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // when value in pBuf has been written into local log store, FpPreCommitCb is called, code indicates the result // user can do something according to the code and isWeak. for example, write data into tsdb - void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpPreCommitCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // when log entry is updated by a new one, FpRollBackCb is called // user can do something to roll back. for example, delete data from tsdb, or just ignore it - void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SSyncBuffer* pBuf, SyncIndex index, bool isWeak, int32_t code); + void (*FpRollBackCb)(struct SSyncFSM* pFsm, const SRpcMsg* pBuf, SyncIndex index, bool isWeak, int32_t code); // user should implement this function, use "data" to take snapshot into "snapshot" int32_t (*FpTakeSnapshot)(SSnapshot* snapshot); @@ -93,10 +95,10 @@ typedef struct SSyncLogStore { void* data; // append one log entry - int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SSyncBuffer* pBuf); + int32_t (*appendEntry)(struct SSyncLogStore* pLogStore, SRpcMsg* pBuf); // get one log entry, user need to free pBuf->data - int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncBuffer* pBuf); + int32_t (*getEntry)(struct SSyncLogStore* pLogStore, SyncIndex index, SRpcMsg* pBuf); // update log store commit index with "index" int32_t (*updateCommitIndex)(struct SSyncLogStore* pLogStore, SyncIndex index); @@ -135,7 +137,9 @@ typedef struct SSyncInfo { SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; SSyncFSM* pFsm; - int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + + void* rpcClient; + int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); } SSyncInfo; @@ -149,8 +153,8 @@ int64_t syncStart(const SSyncInfo* pSyncInfo); void syncStop(int64_t rid); int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); -// int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); +// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/source/libs/sync/inc/syncEnv.h b/source/libs/sync/inc/syncEnv.h index f1c4327b69..7e60822a28 100644 --- a/source/libs/sync/inc/syncEnv.h +++ b/source/libs/sync/inc/syncEnv.h @@ -26,19 +26,26 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" #include "trpc.h" +#include "ttimer.h" + +#define TIMER_MAX_MS 0x7FFFFFFF typedef struct SSyncEnv { - void *pTimer; - void *pTimerManager; + tmr_h pEnvTickTimer; + tmr_h pTimerManager; + char name[128]; + } SSyncEnv; +extern SSyncEnv* gSyncEnv; + int32_t syncEnvStart(); int32_t syncEnvStop(); -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void* param); -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); +void syncEnvStopTimer(tmr_h* pTimer); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 4b788efd79..238948b403 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -30,42 +30,37 @@ extern "C" { #include "trpc.h" typedef struct SSyncIO { - void * serverRpc; - void * clientRpc; STaosQueue *pMsgQ; STaosQset * pQset; - pthread_t tid; - int8_t isStart; + pthread_t consumerTid; - SEpSet epSet; + void * serverRpc; + void * clientRpc; + SEpSet myAddr; - void *syncTimer; - void *syncTimerManager; - - int32_t (*start)(struct SSyncIO *ths); - int32_t (*stop)(struct SSyncIO *ths); - int32_t (*ping)(struct SSyncIO *ths); - - int32_t (*onMsg)(struct SSyncIO *ths, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); - int32_t (*destroy)(struct SSyncIO *ths); + void *ioTimerTickQ; + void *ioTimerTickPing; + void *ioTimerManager; void *pSyncNode; - int32_t (*FpOnPing)(struct SSyncNode *ths, SyncPing *pMsg); + int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); + int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg); + int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg); + int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); + int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); + int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + + int8_t isStart; } SSyncIO; extern SSyncIO *gSyncIO; -int32_t syncIOStart(); -int32_t syncIOStop(); -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg); -SSyncIO *syncIOCreate(); - -static int32_t doSyncIOStart(SSyncIO *io); -static int32_t doSyncIOStop(SSyncIO *io); -static int32_t doSyncIOPing(SSyncIO *io); -static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -static int32_t doSyncIODestroy(SSyncIO *io); +int32_t syncIOStart(char *host, uint16_t port); +int32_t syncIOStop(); +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOTickQ(); +int32_t syncIOTickPing(); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index ad8484662a..0901330488 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -26,6 +26,7 @@ extern "C" { #include "sync.h" #include "taosdef.h" #include "tlog.h" +#include "ttimer.h" extern int32_t sDebugFlag; @@ -47,23 +48,23 @@ extern int32_t sDebugFlag; taosPrintLog("SYN WARN ", sDebugFlag, __VA_ARGS__); \ } \ } -#define sInfo(...) \ - { \ - if (sDebugFlag & DEBUG_INFO) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sInfo(...) \ + { \ + if (sDebugFlag & DEBUG_INFO) { \ + taosPrintLog("SYN INFO ", sDebugFlag, __VA_ARGS__); \ + } \ } -#define sDebug(...) \ - { \ - if (sDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sDebug(...) \ + { \ + if (sDebugFlag & DEBUG_DEBUG) { \ + taosPrintLog("SYN DEBUG ", sDebugFlag, __VA_ARGS__); \ + } \ } -#define sTrace(...) \ - { \ - if (sDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("SYN ", sDebugFlag, __VA_ARGS__); \ - } \ +#define sTrace(...) \ + { \ + if (sDebugFlag & DEBUG_TRACE) { \ + taosPrintLog("SYN TRACE ", sDebugFlag, __VA_ARGS__); \ + } \ } struct SRaft; @@ -87,35 +88,64 @@ typedef struct SyncAppendEntries SyncAppendEntries; struct SyncAppendEntriesReply; typedef struct SyncAppendEntriesReply SyncAppendEntriesReply; -typedef struct SSyncNode { - int8_t replica; - int8_t quorum; +struct SSyncEnv; +typedef struct SSyncEnv SSyncEnv; +typedef struct SRaftId { + SyncNodeId addr; // typedef uint64_t SyncNodeId; + SyncGroupId vgId; // typedef int32_t SyncGroupId; +} SRaftId; + +typedef struct SSyncNode { SyncGroupId vgId; SSyncCfg syncCfg; char path[TSDB_FILENAME_LEN]; + SSyncFSM* pFsm; - SRaft* pRaft; + // passed from outside + void* rpcClient; + int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); - int32_t (*FpPing)(struct SSyncNode* ths, const SyncPing* pMsg); + int32_t refCount; + int64_t rid; - int32_t (*FpOnPing)(struct SSyncNode* ths, SyncPing* pMsg); + SNodeInfo me; + SNodeInfo peers[TSDB_MAX_REPLICA]; + int32_t peersNum; - int32_t (*FpOnPingReply)(struct SSyncNode* ths, SyncPingReply* pMsg); + ESyncRole role; + SRaftId raftId; - int32_t (*FpRequestVote)(struct SSyncNode* ths, const SyncRequestVote* pMsg); + tmr_h pPingTimer; + int32_t pingTimerMS; + uint8_t pingTimerStart; + TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp + uint64_t pingTimerCounter; - int32_t (*FpOnRequestVote)(struct SSyncNode* ths, SyncRequestVote* pMsg); + tmr_h pElectTimer; + int32_t electTimerMS; + uint8_t electTimerStart; + TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp + uint64_t electTimerCounter; - int32_t (*FpOnRequestVoteReply)(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); + tmr_h pHeartbeatTimer; + int32_t heartbeatTimerMS; + uint8_t heartbeatTimerStart; + TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp + uint64_t heartbeatTimerCounter; - int32_t (*FpAppendEntries)(struct SSyncNode* ths, const SyncAppendEntries* pMsg); + // callback + int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); - int32_t (*FpOnAppendEntries)(struct SSyncNode* ths, SyncAppendEntries* pMsg); + int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); - int32_t (*FpOnAppendEntriesReply)(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); - int32_t (*FpSendMsg)(void* handle, const SEpSet* pEpSet, SRpcMsg* pMsg); + int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); + + int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); + + int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); } SSyncNode; @@ -123,23 +153,15 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo); void syncNodeClose(SSyncNode* pSyncNode); -static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg); +void syncNodePingAll(SSyncNode* pSyncNode); -static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg); +void syncNodePingPeers(SSyncNode* pSyncNode); -static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg); +void syncNodePingSelf(SSyncNode* pSyncNode); -static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg); +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); -static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg); - -static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg); - -static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg); - -static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg); - -static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 2ee5e0109c..be53559e8a 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -23,13 +23,15 @@ extern "C" { #include #include #include +#include "cJSON.h" #include "sync.h" #include "syncRaftEntry.h" #include "taosdef.h" +// encode as uint64 typedef enum ESyncMessageType { - SYNC_PING = 0, - SYNC_PING_REPLY, + SYNC_PING = 101, + SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST, SYNC_CLIENT_REQUEST_REPLY, SYNC_REQUEST_VOTE, @@ -38,29 +40,86 @@ typedef enum ESyncMessageType { SYNC_APPEND_ENTRIES_REPLY, } ESyncMessageType; +/* +typedef struct SRaftId { + SyncNodeId addr; // typedef uint64_t SyncNodeId; + SyncGroupId vgId; // typedef int32_t SyncGroupId; +} SRaftId; +*/ + typedef struct SyncPing { - ESyncMessageType msgType; - const SSyncBuffer *pData; -} SyncPing, RaftPing; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + uint32_t dataLen; + char data[]; +} SyncPing; + +#define SYNC_PING_FIX_LEN (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) + +SyncPing* syncPingBuild(uint32_t dataLen); + +void syncPingDestroy(SyncPing* pMsg); + +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen); + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg); + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg); + +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg); + +cJSON* syncPing2Json(const SyncPing* pMsg); + +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId); typedef struct SyncPingReply { - ESyncMessageType msgType; - const SSyncBuffer *pData; -} SyncPingReply, RaftPingReply; + uint32_t bytes; + uint32_t msgType; + SRaftId srcId; + SRaftId destId; + uint32_t dataLen; + char data[]; +} SyncPingReply; + +#define SYNC_PING_REPLY_FIX_LEN \ + (sizeof(uint32_t) + sizeof(uint32_t) + sizeof(SRaftId) + sizeof(SRaftId) + sizeof(uint32_t)) + +SyncPingReply* syncPingReplyBuild(uint32_t dataLen); + +void syncPingReplyDestroy(SyncPingReply* pMsg); + +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen); + +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg); + +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg); + +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg); + +cJSON* syncPingReply2Json(const SyncPingReply* pMsg); + +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str); + +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId); typedef struct SyncClientRequest { - ESyncMessageType msgType; - const SSyncBuffer *pData; - int64_t seqNum; - bool isWeak; -} SyncClientRequest, RaftClientRequest; + ESyncMessageType msgType; + char* data; + uint32_t dataLen; + int64_t seqNum; + bool isWeak; +} SyncClientRequest; typedef struct SyncClientRequestReply { - ESyncMessageType msgType; - int32_t errCode; - const SSyncBuffer *pErrMsg; - const SSyncBuffer *pLeaderHint; -} SyncClientRequestReply, RaftClientRequestReply; + ESyncMessageType msgType; + int32_t errCode; + SSyncBuffer* pErrMsg; + SSyncBuffer* pLeaderHint; +} SyncClientRequestReply; typedef struct SyncRequestVote { ESyncMessageType msgType; @@ -69,7 +128,7 @@ typedef struct SyncRequestVote { SyncGroupId vgId; SyncIndex lastLogIndex; SyncTerm lastLogTerm; -} SyncRequestVote, RaftRequestVote; +} SyncRequestVote; typedef struct SyncRequestVoteReply { ESyncMessageType msgType; @@ -77,7 +136,7 @@ typedef struct SyncRequestVoteReply { SyncNodeId nodeId; SyncGroupId vgId; bool voteGranted; -} SyncRequestVoteReply, RaftRequestVoteReply; +} SyncRequestVoteReply; typedef struct SyncAppendEntries { ESyncMessageType msgType; @@ -86,9 +145,9 @@ typedef struct SyncAppendEntries { SyncIndex prevLogIndex; SyncTerm prevLogTerm; int32_t entryCount; - SSyncRaftEntry * logEntries; + SSyncRaftEntry* logEntries; SyncIndex commitIndex; -} SyncAppendEntries, RaftAppendEntries; +} SyncAppendEntries; typedef struct SyncAppendEntriesReply { ESyncMessageType msgType; @@ -96,7 +155,7 @@ typedef struct SyncAppendEntriesReply { SyncNodeId nodeId; bool success; SyncIndex matchIndex; -} SyncAppendEntriesReply, RaftAppendEntriesReply; +} SyncAppendEntriesReply; #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncRaft.h b/source/libs/sync/inc/syncRaft.h index a247a29fc4..bc5cf26a4c 100644 --- a/source/libs/sync/inc/syncRaft.h +++ b/source/libs/sync/inc/syncRaft.h @@ -27,6 +27,8 @@ extern "C" { #include "syncMessage.h" #include "taosdef.h" +#if 0 + typedef struct SRaftId { SyncNodeId addr; SyncGroupId vgId; @@ -82,6 +84,8 @@ int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak); static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft); +#endif + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRaftStore.h b/source/libs/sync/inc/syncRaftStore.h index 0fdbd7a150..c480486ff0 100644 --- a/source/libs/sync/inc/syncRaftStore.h +++ b/source/libs/sync/inc/syncRaftStore.h @@ -34,8 +34,9 @@ extern "C" { typedef struct SRaftStore { SyncTerm currentTerm; SRaftId voteFor; - //FileFd fd; - char path[RAFT_STORE_PATH_LEN]; + // FileFd fd; + TdFilePtr pFile; + char path[RAFT_STORE_PATH_LEN]; } SRaftStore; SRaftStore *raftStoreOpen(const char *path); diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h new file mode 100644 index 0000000000..93d2c12525 --- /dev/null +++ b/source/libs/sync/inc/syncUtil.h @@ -0,0 +1,57 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_LIBS_SYNC_UTIL_H +#define _TD_LIBS_SYNC_UTIL_H + +#ifdef __cplusplus +extern "C" { +#endif + +#include +#include +#include +#include "syncInt.h" +#include "syncMessage.h" +#include "taosdef.h" + +// ---- encode / decode + +uint64_t syncUtilAddr2U64(const char* host, uint16_t port); + +void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port); + +void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet); + +void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); + +void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); + +// ---- SSyncBuffer ---- +#if 0 +void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); + +void syncUtilbufDestroy(SSyncBuffer* syncBuf); + +void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); + +void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); +#endif + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_LIBS_SYNC_UTIL_H*/ diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c index e71cf55cb1..a9cf035650 100644 --- a/source/libs/sync/src/syncEnv.c +++ b/source/libs/sync/src/syncEnv.c @@ -18,6 +18,14 @@ SSyncEnv *gSyncEnv = NULL; +// local function ----------------- +static void syncEnvTick(void *param, void *tmrId); +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv); +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv); +static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param); +static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer); +// -------------------------------- + int32_t syncEnvStart() { int32_t ret; gSyncEnv = (SSyncEnv *)malloc(sizeof(SSyncEnv)); @@ -31,6 +39,40 @@ int32_t syncEnvStop() { return ret; } -static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { return 0; } +tmr_h syncEnvStartTimer(TAOS_TMR_CALLBACK fp, int mseconds, void *param) { + return doSyncEnvStartTimer(gSyncEnv, fp, mseconds, param); +} -static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { return 0; } +void syncEnvStopTimer(tmr_h *pTimer) { doSyncEnvStopTimer(gSyncEnv, pTimer); } + +// local function ----------------- +static void syncEnvTick(void *param, void *tmrId) { + SSyncEnv *pSyncEnv = (SSyncEnv *)param; + sTrace("syncEnvTick ... name:%s ", pSyncEnv->name); + + pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); +} + +static int32_t doSyncEnvStart(SSyncEnv *pSyncEnv) { + snprintf(pSyncEnv->name, sizeof(pSyncEnv->name), "SyncEnv_%p", pSyncEnv); + + // start tmr thread + pSyncEnv->pTimerManager = taosTmrInit(1000, 50, 10000, "SYNC-ENV"); + + // pSyncEnv->pEnvTickTimer = taosTmrStart(syncEnvTick, 1000, pSyncEnv, pSyncEnv->pTimerManager); + + sTrace("SyncEnv start ok, name:%s", pSyncEnv->name); + + return 0; +} + +static int32_t doSyncEnvStop(SSyncEnv *pSyncEnv) { + taosTmrCleanUp(pSyncEnv->pTimerManager); + return 0; +} + +static tmr_h doSyncEnvStartTimer(SSyncEnv *pSyncEnv, TAOS_TMR_CALLBACK fp, int mseconds, void *param) { + return taosTmrStart(fp, mseconds, pSyncEnv, pSyncEnv->pTimerManager); +} + +static void doSyncEnvStopTimer(SSyncEnv *pSyncEnv, tmr_h *pTimer) {} diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index bb20d11e37..8d04b5bb26 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -22,118 +22,72 @@ SSyncIO *gSyncIO = NULL; -int32_t syncIOSendMsg(void *handle, const SEpSet *pEpSet, SRpcMsg *pMsg) { return 0; } +// local function ------------ +static int32_t syncIOStartInternal(SSyncIO *io); +static int32_t syncIOStopInternal(SSyncIO *io); +static SSyncIO *syncIOCreate(char *host, uint16_t port); +static int32_t syncIODestroy(SSyncIO *io); -int32_t syncIOStart() { return 0; } +static void *syncIOConsumerFunc(void *param); +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); +static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); +static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); -int32_t syncIOStop() { return 0; } +static int32_t syncIOTickQInternal(SSyncIO *io); +static void syncIOTickQFunc(void *param, void *tmrId); +static int32_t syncIOTickPingInternal(SSyncIO *io); +static void syncIOTickPingFunc(void *param, void *tmrId); +// ---------------------------- -static void syncTick(void *param, void *tmrId) { - SSyncIO *io = (SSyncIO *)param; - sDebug("syncTick ... "); - - SRpcMsg rpcMsg; - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "TICK"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 2; - - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); - - taosWriteQitem(io->pMsgQ, pTemp); - - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); +// public function ------------ +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { + sTrace( + "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " + "pMsg->msgType:%d, pMsg->contLen:%d", + clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, + pMsg->msgType, pMsg->contLen); + pMsg->handle = NULL; + rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + return 0; } -void *syncConsumer(void *param) { - SSyncIO *io = param; +int32_t syncIOStart(char *host, uint16_t port) { + gSyncIO = syncIOCreate(host, port); + assert(gSyncIO != NULL); - STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; - int type; + int32_t ret = syncIOStartInternal(gSyncIO); + assert(ret == 0); - qall = taosAllocateQall(); - - while (1) { - int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); - sDebug("%d sync-io msgs are received", numOfMsgs); - if (numOfMsgs <= 0) break; - - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - sDebug("sync-io recv type:%d msg:%s", pRpcMsg->msgType, (char *)(pRpcMsg->pCont)); - } - - taosResetQitems(qall); - for (int i = 0; i < numOfMsgs; ++i) { - taosGetQitem(qall, (void **)&pRpcMsg); - rpcFreeCont(pRpcMsg->pCont); - - if (pRpcMsg->handle != NULL) { - int msgSize = 128; - memset(&rpcMsg, 0, sizeof(rpcMsg)); - rpcMsg.pCont = rpcMallocCont(msgSize); - rpcMsg.contLen = msgSize; - rpcMsg.handle = pRpcMsg->handle; - rpcMsg.code = 0; - rpcSendResponse(&rpcMsg); - } - - taosFreeQitem(pRpcMsg); - } - } - - taosFreeQall(qall); - return NULL; + sTrace("syncIOStart ok, gSyncIO:%p gSyncIO->clientRpc:%p", gSyncIO, gSyncIO->clientRpc); + return 0; } -static int retrieveAuthInfo(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { - // app shall retrieve the auth info based on meterID from DB or a data file - // demo code here only for simple demo - int ret = 0; +int32_t syncIOStop() { + int32_t ret = syncIOStopInternal(gSyncIO); + assert(ret == 0); + + ret = syncIODestroy(gSyncIO); + assert(ret == 0); return ret; } -static void processResponse(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - sDebug("processResponse ... "); - rpcFreeCont(pMsg->pCont); +int32_t syncIOTickQ() { + int32_t ret = syncIOTickQInternal(gSyncIO); + assert(ret == 0); + return ret; } -static void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { - SSyncIO *io = pParent; - SRpcMsg *pTemp; - - pTemp = taosAllocateQitem(sizeof(SRpcMsg)); - memcpy(pTemp, pMsg, sizeof(SRpcMsg)); - - sDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp); - taosWriteQitem(io->pMsgQ, pTemp); +int32_t syncIOTickPing() { + int32_t ret = syncIOTickPingInternal(gSyncIO); + assert(ret == 0); + return ret; } -SSyncIO *syncIOCreate() { - SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); - memset(io, 0, sizeof(*io)); - - io->pMsgQ = taosOpenQueue(); - io->pQset = taosOpenQset(); - taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - - io->start = doSyncIOStart; - io->stop = doSyncIOStop; - io->ping = doSyncIOPing; - io->onMsg = doSyncIOOnMsg; - io->destroy = doSyncIODestroy; - - return io; -} - -static int32_t doSyncIOStart(SSyncIO *io) { +// local function ------------ +static int32_t syncIOStartInternal(SSyncIO *io) { taosBlockSIGPIPE(); + rpcInit(); tsRpcForceTcp = 1; // cient rpc init @@ -143,7 +97,7 @@ static int32_t doSyncIOStart(SSyncIO *io) { rpcInit.localPort = 0; rpcInit.label = "SYNC-IO-CLIENT"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processResponse; + rpcInit.cfp = syncIOProcessReply; rpcInit.sessions = 100; rpcInit.idleTime = 100; rpcInit.user = "sync-io"; @@ -163,13 +117,13 @@ static int32_t doSyncIOStart(SSyncIO *io) { { SRpcInit rpcInit; memset(&rpcInit, 0, sizeof(rpcInit)); - rpcInit.localPort = 38000; + rpcInit.localPort = io->myAddr.eps[0].port; rpcInit.label = "SYNC-IO-SERVER"; rpcInit.numOfThreads = 1; - rpcInit.cfp = processRequestMsg; + rpcInit.cfp = syncIOProcessRequest; rpcInit.sessions = 1000; rpcInit.idleTime = 2 * 1500; - rpcInit.afp = retrieveAuthInfo; + rpcInit.afp = syncIOAuth; rpcInit.parent = io; rpcInit.connType = TAOS_CONN_SERVER; @@ -180,12 +134,9 @@ static int32_t doSyncIOStart(SSyncIO *io) { } } - io->epSet.inUse = 0; - addEpIntoEpSet(&io->epSet, "127.0.0.1", 38000); - // start consumer thread { - if (pthread_create(&io->tid, NULL, syncConsumer, io) != 0) { + if (pthread_create(&io->consumerTid, NULL, syncIOConsumerFunc, io) != 0) { sError("failed to create sync consumer thread since %s", strerror(errno)); terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -193,35 +144,32 @@ static int32_t doSyncIOStart(SSyncIO *io) { } // start tmr thread - io->syncTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); - io->syncTimer = taosTmrStart(syncTick, 1000, io, io->syncTimerManager); + io->ioTimerManager = taosTmrInit(1000, 50, 10000, "SYNC"); return 0; } -static int32_t doSyncIOStop(SSyncIO *io) { +static int32_t syncIOStopInternal(SSyncIO *io) { atomic_store_8(&io->isStart, 0); - pthread_join(io->tid, NULL); + pthread_join(io->consumerTid, NULL); return 0; } -static int32_t doSyncIOPing(SSyncIO *io) { - SRpcMsg rpcMsg, rspMsg; +static SSyncIO *syncIOCreate(char *host, uint16_t port) { + SSyncIO *io = (SSyncIO *)malloc(sizeof(SSyncIO)); + memset(io, 0, sizeof(*io)); - rpcMsg.pCont = rpcMallocCont(10); - snprintf(rpcMsg.pCont, 10, "ping"); - rpcMsg.contLen = 10; - rpcMsg.handle = NULL; - rpcMsg.msgType = 1; + io->pMsgQ = taosOpenQueue(); + io->pQset = taosOpenQset(); + taosAddIntoQset(io->pQset, io->pMsgQ, NULL); - rpcSendRequest(io->clientRpc, &io->epSet, &rpcMsg, NULL); + io->myAddr.inUse = 0; + addEpIntoEpSet(&io->myAddr, host, port); - return 0; + return io; } -static int32_t doSyncIOOnMsg(struct SSyncIO *io, void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { return 0; } - -static int32_t doSyncIODestroy(SSyncIO *io) { +static int32_t syncIODestroy(SSyncIO *io) { int8_t start = atomic_load_8(&io->isStart); assert(start == 0); @@ -235,15 +183,149 @@ static int32_t doSyncIODestroy(SSyncIO *io) { io->clientRpc = NULL; } - if (io->pMsgQ != NULL) { - free(io->pMsgQ); - io->pMsgQ = NULL; - } - - if (io->pQset != NULL) { - free(io->pQset); - io->pQset = NULL; - } + taosCloseQueue(io->pMsgQ); + taosCloseQset(io->pQset); return 0; } + +static void *syncIOConsumerFunc(void *param) { + SSyncIO *io = param; + + STaosQall *qall; + SRpcMsg * pRpcMsg, rpcMsg; + int type; + + qall = taosAllocateQall(); + + while (1) { + int numOfMsgs = taosReadAllQitemsFromQset(io->pQset, qall, NULL, NULL); + sTrace("syncIOConsumerFunc %d msgs are received", numOfMsgs); + if (numOfMsgs <= 0) break; + + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + sTrace("syncIOConsumerFunc get item from queue: msgType:%d contLen:%d msg:%s", pRpcMsg->msgType, pRpcMsg->contLen, + (char *)(pRpcMsg->pCont)); + + if (pRpcMsg->msgType == SYNC_PING) { + if (io->FpOnSyncPing != NULL) { + SyncPing *pSyncMsg; + + SRpcMsg tmpRpcMsg; + memcpy(&tmpRpcMsg, pRpcMsg, sizeof(SRpcMsg)); + pSyncMsg = syncPingBuild(tmpRpcMsg.contLen); + + syncPingFromRpcMsg(pRpcMsg, pSyncMsg); + + // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); + + io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); + syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); + + if (io->FpOnSyncPingReply != NULL) { + io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + } + } else { + ; + } + } + + taosResetQitems(qall); + for (int i = 0; i < numOfMsgs; ++i) { + taosGetQitem(qall, (void **)&pRpcMsg); + rpcFreeCont(pRpcMsg->pCont); + + if (pRpcMsg->handle != NULL) { + int msgSize = 128; + memset(&rpcMsg, 0, sizeof(rpcMsg)); + rpcMsg.pCont = rpcMallocCont(msgSize); + rpcMsg.contLen = msgSize; + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "give a reply"); + rpcMsg.handle = pRpcMsg->handle; + rpcMsg.code = 0; + + sTrace("syncIOConsumerFunc rpcSendResponse ... msgType:%d contLen:%d", pRpcMsg->msgType, rpcMsg.contLen); + rpcSendResponse(&rpcMsg); + } + + taosFreeQitem(pRpcMsg); + } + } + + taosFreeQall(qall); + return NULL; +} + +static int syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey) { + // app shall retrieve the auth info based on meterID from DB or a data file + // demo code here only for simple demo + int ret = 0; + return ret; +} + +static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sTrace("<-- syncIOProcessRequest --> type:%d, contLen:%d, cont:%s", pMsg->msgType, pMsg->contLen, + (char *)pMsg->pCont); + + SSyncIO *io = pParent; + SRpcMsg *pTemp; + + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); +} + +static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) { + sTrace("syncIOProcessReply: type:%d, contLen:%d msg:%s", pMsg->msgType, pMsg->contLen, (char *)pMsg->pCont); + rpcFreeCont(pMsg->pCont); +} + +static int32_t syncIOTickQInternal(SSyncIO *io) { + io->ioTimerTickQ = taosTmrStart(syncIOTickQFunc, 1000, io, io->ioTimerManager); + return 0; +} + +static void syncIOTickQFunc(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sTrace("<-- syncIOTickQFunc -->"); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickQ"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 55; + + SRpcMsg *pTemp; + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, &rpcMsg, sizeof(SRpcMsg)); + + taosWriteQitem(io->pMsgQ, pTemp); + taosTmrReset(syncIOTickQFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickQ); +} + +static int32_t syncIOTickPingInternal(SSyncIO *io) { + io->ioTimerTickPing = taosTmrStart(syncIOTickPingFunc, 1000, io, io->ioTimerManager); + return 0; +} + +static void syncIOTickPingFunc(void *param, void *tmrId) { + SSyncIO *io = (SSyncIO *)param; + sTrace("<-- syncIOTickPingFunc -->"); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf(rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOTickPing"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 77; + + rpcSendRequest(io->clientRpc, &io->myAddr, &rpcMsg, NULL); + taosTmrReset(syncIOTickPingFunc, 1000, io, io->ioTimerManager, &io->ioTimerTickPing); +} \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index bd2952505e..49fac038da 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -15,12 +15,36 @@ #include #include "sync.h" +#include "syncEnv.h" #include "syncInt.h" #include "syncRaft.h" +#include "syncUtil.h" -int32_t syncInit() { return 0; } +static int32_t tsNodeRefId = -1; -void syncCleanUp() {} +// ------ local funciton --------- +static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); +static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); +static void syncNodePingTimerCb(void* param, void* tmrId); + +static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); +static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); +static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg); + +static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); +static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); +static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); +static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); +static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +// --------------------------------- + +int32_t syncInit() { + sTrace("syncInit ok"); + return 0; +} + +void syncCleanUp() { sTrace("syncCleanUp ok"); } int64_t syncStart(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = syncNodeOpen(pSyncInfo); @@ -32,7 +56,9 @@ void syncStop(int64_t rid) {} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } +// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } + +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } @@ -41,69 +67,238 @@ void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole) {} SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { SSyncNode* pSyncNode = (SSyncNode*)malloc(sizeof(SSyncNode)); assert(pSyncNode != NULL); + memset(pSyncNode, 0, sizeof(SSyncNode)); + pSyncNode->vgId = pSyncInfo->vgId; + pSyncNode->syncCfg = pSyncInfo->syncCfg; + memcpy(pSyncNode->path, pSyncInfo->path, sizeof(pSyncNode->path)); + pSyncNode->pFsm = pSyncInfo->pFsm; + + pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; - pSyncNode->FpPing = doSyncNodePing; - pSyncNode->FpOnPing = onSyncNodePing; - pSyncNode->FpOnPingReply = onSyncNodePingReply; - pSyncNode->FpRequestVote = doSyncNodeRequestVote; - pSyncNode->FpOnRequestVote = onSyncNodeRequestVote; - pSyncNode->FpOnRequestVoteReply = onSyncNodeRequestVoteReply; - pSyncNode->FpAppendEntries = doSyncNodeAppendEntries; - pSyncNode->FpOnAppendEntries = onSyncNodeAppendEntries; - pSyncNode->FpOnAppendEntriesReply = onSyncNodeAppendEntriesReply; + pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; + pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; + + int j = 0; + for (int i = 0; i < pSyncInfo->syncCfg.replicaNum; ++i) { + if (i != pSyncInfo->syncCfg.myIndex) { + pSyncNode->peers[j] = pSyncInfo->syncCfg.nodeInfo[i]; + j++; + } + } + + pSyncNode->role = TAOS_SYNC_STATE_FOLLOWER; + syncUtilnodeInfo2raftId(&pSyncNode->me, pSyncNode->vgId, &pSyncNode->raftId); + + pSyncNode->pPingTimer = NULL; + pSyncNode->pingTimerMS = 1000; + atomic_store_8(&pSyncNode->pingTimerStart, 0); + pSyncNode->FpPingTimer = syncNodePingTimerCb; + pSyncNode->pingTimerCounter = 0; + + pSyncNode->FpOnPing = syncNodeOnPingCb; + pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; + pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; + pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; + pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; + pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; return pSyncNode; } void syncNodeClose(SSyncNode* pSyncNode) { assert(pSyncNode != NULL); - raftClose(pSyncNode->pRaft); free(pSyncNode); } -static int32_t doSyncNodePing(struct SSyncNode* ths, const SyncPing* pMsg) { - int32_t ret = ths->pRaft->FpPing(ths->pRaft, pMsg); +void syncNodePingAll(SSyncNode* pSyncNode) { + sTrace("syncNodePingAll pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + for (int i = 0; i < pSyncNode->syncCfg.replicaNum; ++i) { + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->syncCfg.nodeInfo[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); + } +} + +void syncNodePingPeers(SSyncNode* pSyncNode) { + int32_t ret = 0; + for (int i = 0; i < pSyncNode->peersNum; ++i) { + SRaftId destId; + syncUtilnodeInfo2raftId(&pSyncNode->peers[i], pSyncNode->vgId, &destId); + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &destId); + ret = syncNodePing(pSyncNode, &destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); + } +} + +void syncNodePingSelf(SSyncNode* pSyncNode) { + int32_t ret; + SyncPing* pMsg = syncPingBuild3(&pSyncNode->raftId, &pSyncNode->raftId); + ret = syncNodePing(pSyncNode, &pMsg->destId, pMsg); + assert(ret == 0); + syncPingDestroy(pMsg); +} + +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pPingTimer == NULL) { + pSyncNode->pPingTimer = + taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } + + atomic_store_8(&pSyncNode->pingTimerStart, 1); + return 0; +} + +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->pingTimerStart, 0); + pSyncNode->pingTimerCounter = TIMER_MAX_MS; + return 0; +} + +// ------ local funciton --------- +static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { + sTrace("syncNodePing pSyncNode:%p ", pSyncNode); + int32_t ret = 0; + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + + /* + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf((char*)rpcMsg.pCont, rpcMsg.contLen, "%s", "xxxxxxxxxxxxxx"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 1; + */ + + syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + { + SyncPing* pMsg2 = rpcMsg.pCont; + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodePing rpcMsg.pCont:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + return ret; } -static int32_t onSyncNodePing(struct SSyncNode* ths, SyncPing* pMsg) { - int32_t ret = ths->pRaft->FpOnPing(ths->pRaft, pMsg); +static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg) { + int32_t ret = 0; return ret; } -static int32_t onSyncNodePingReply(struct SSyncNode* ths, SyncPingReply* pMsg) { - int32_t ret = ths->pRaft->FpOnPingReply(ths->pRaft, pMsg); +static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pMsg) { + int32_t ret = 0; return ret; } -static int32_t doSyncNodeRequestVote(struct SSyncNode* ths, const SyncRequestVote* pMsg) { - int32_t ret = ths->pRaft->FpRequestVote(ths->pRaft, pMsg); +static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode); + SEpSet epSet; + syncUtilraftId2EpSet(destRaftId, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg) { + SEpSet epSet; + syncUtilnodeInfo2EpSet(nodeInfo, &epSet); + pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); + return 0; +} + +static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { + int32_t ret = 0; + sTrace("<-- syncNodeOnPingCb -->"); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SyncPingReply* pMsgReply = syncPingReplyBuild3(&ths->raftId, &pMsg->srcId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsgReply, &rpcMsg); + syncNodeSendMsgById(&pMsgReply->destId, ths, &rpcMsg); + return ret; } -static int32_t onSyncNodeRequestVote(struct SSyncNode* ths, SyncRequestVote* pMsg) { - int32_t ret = ths->pRaft->FpOnRequestVote(ths->pRaft, pMsg); +static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { + int32_t ret = 0; + sTrace("<-- syncNodeOnPingReplyCb -->"); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + return ret; } -static int32_t onSyncNodeRequestVoteReply(struct SSyncNode* ths, SyncRequestVoteReply* pMsg) { - int32_t ret = ths->pRaft->FpOnRequestVoteReply(ths->pRaft, pMsg); +static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg) { + int32_t ret = 0; return ret; } -static int32_t doSyncNodeAppendEntries(struct SSyncNode* ths, const SyncAppendEntries* pMsg) { - int32_t ret = ths->pRaft->FpAppendEntries(ths->pRaft, pMsg); +static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg) { + int32_t ret = 0; return ret; } -static int32_t onSyncNodeAppendEntries(struct SSyncNode* ths, SyncAppendEntries* pMsg) { - int32_t ret = ths->pRaft->FpOnAppendEntries(ths->pRaft, pMsg); +static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { + int32_t ret = 0; return ret; } -static int32_t onSyncNodeAppendEntriesReply(struct SSyncNode* ths, SyncAppendEntriesReply* pMsg) { - int32_t ret = ths->pRaft->FpOnAppendEntriesReply(ths->pRaft, pMsg); +static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { + int32_t ret = 0; return ret; +} + +static void syncNodePingTimerCb(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + if (atomic_load_8(&pSyncNode->pingTimerStart)) { + ++(pSyncNode->pingTimerCounter); + // pSyncNode->pingTimerMS += 100; + + sTrace( + "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " + "tmrId:%p ", + pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId); + + syncNodePingAll(pSyncNode); + + taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + &pSyncNode->pPingTimer); + } else { + sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart); + } } \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 8937303725..a26c8401b2 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -15,5 +15,265 @@ #include "syncMessage.h" #include "syncRaft.h" +#include "syncUtil.h" +#include "tcoding.h" -void onMessage(SRaft *pRaft, void *pMsg) {} \ No newline at end of file +void onMessage(SRaft* pRaft, void* pMsg) {} + +// ---- message process SyncPing---- +SyncPing* syncPingBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; + SyncPing* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_PING; + pMsg->dataLen = dataLen; +} + +void syncPingDestroy(SyncPing* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); +} + +void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) { + syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncPing2Json(const SyncPing* pMsg) { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + +SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPing* pMsg = syncPingBuild(dataLen); + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPing* pMsg = syncPingBuild2(srcId, destId, "ping"); + return pMsg; +} + +// ---- message process SyncPingReply---- +SyncPingReply* syncPingReplyBuild(uint32_t dataLen) { + uint32_t bytes = SYNC_PING_REPLY_FIX_LEN + dataLen; + SyncPingReply* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_PING_REPLY; + pMsg->dataLen = dataLen; +} + +void syncPingReplyDestroy(SyncPingReply* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); + assert(pMsg->bytes == SYNC_PING_FIX_LEN + pMsg->dataLen); +} + +void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncPingReplySerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg) { + syncPingReplyDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncPingReply2Json(const SyncPingReply* pMsg) { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + + cJSON* pSrcId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pSrcId, "addr", pMsg->srcId.addr); + { + uint64_t u64 = pMsg->srcId.addr; + cJSON* pTmp = pSrcId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId); + cJSON_AddItemToObject(pRoot, "srcId", pSrcId); + + cJSON* pDestId = cJSON_CreateObject(); + cJSON_AddNumberToObject(pDestId, "addr", pMsg->destId.addr); + { + uint64_t u64 = pMsg->destId.addr; + cJSON* pTmp = pDestId; + char host[128]; + uint16_t port; + syncUtilU642Addr(u64, host, sizeof(host), &port); + cJSON_AddStringToObject(pTmp, "addr_host", host); + cJSON_AddNumberToObject(pTmp, "addr_port", port); + } + cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId); + cJSON_AddItemToObject(pRoot, "destId", pDestId); + + cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen); + cJSON_AddStringToObject(pRoot, "data", pMsg->data); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPingReply", pRoot); + return pJson; +} + +SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, const char* str) { + uint32_t dataLen = strlen(str) + 1; + SyncPingReply* pMsg = syncPingReplyBuild(dataLen); + pMsg->srcId = *srcId; + pMsg->destId = *destId; + snprintf(pMsg->data, pMsg->dataLen, "%s", str); + return pMsg; +} + +SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId) { + SyncPingReply* pMsg = syncPingReplyBuild2(srcId, destId, "pang"); + return pMsg; +} + +#if 0 +void syncPingSerialize(const SyncPing* pMsg, char** ppBuf, uint32_t* bufLen) { + *bufLen = sizeof(SyncPing) + pMsg->dataLen; + *ppBuf = (char*)malloc(*bufLen); + void* pStart = *ppBuf; + uint32_t allBytes = *bufLen; + + int len = 0; + len = taosEncodeFixedU32(&pStart, pMsg->msgType); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU64(&pStart, pMsg->srcId.addr); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedI32(&pStart, pMsg->srcId.vgId); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU64(&pStart, pMsg->destId.addr); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedI32(&pStart, pMsg->destId.vgId); + allBytes -= len; + assert(len > 0); + pStart += len; + + len = taosEncodeFixedU32(&pStart, pMsg->dataLen); + allBytes -= len; + assert(len > 0); + pStart += len; + + memcpy(pStart, pMsg->data, pMsg->dataLen); + allBytes -= pMsg->dataLen; + assert(allBytes == 0); +} + + +void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) { + void* pStart = (void*)buf; + uint64_t u64; + int32_t i32; + uint32_t u32; + + pStart = taosDecodeFixedU64(pStart, &u64); + pMsg->msgType = u64; + + pStart = taosDecodeFixedU64(pStart, &u64); + pMsg->srcId.addr = u64; + + pStart = taosDecodeFixedI32(pStart, &i32); + pMsg->srcId.vgId = i32; + + pStart = taosDecodeFixedU64(pStart, &u64); + pMsg->destId.addr = u64; + + pStart = taosDecodeFixedI32(pStart, &i32); + pMsg->destId.vgId = i32; + + pStart = taosDecodeFixedU32(pStart, &u32); + pMsg->dataLen = u32; +} +#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncRaft.c b/source/libs/sync/src/syncRaft.c index 9f139730d1..b07c6ea797 100644 --- a/source/libs/sync/src/syncRaft.c +++ b/source/libs/sync/src/syncRaft.c @@ -16,6 +16,8 @@ #include "syncRaft.h" #include "sync.h" +#if 0 + SRaft* raftOpen(SRaftId raftId, SSyncFSM* pFsm) { SRaft* pRaft = (SRaft*)malloc(sizeof(SRaft)); assert(pRaft != NULL); @@ -64,3 +66,5 @@ static int32_t onRaftAppendEntriesReply(struct SRaft* ths, RaftAppendEntriesRepl int32_t raftPropose(SRaft* pRaft, const SSyncBuffer* pBuf, bool isWeak) { return 0; } static int raftSendMsg(SRaftId destRaftId, const void* pMsg, const SRaft* pRaft) { return 0; } + +#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncRaftStore.c b/source/libs/sync/src/syncRaftStore.c index 964cc78490..59c85c38de 100644 --- a/source/libs/sync/src/syncRaftStore.c +++ b/source/libs/sync/src/syncRaftStore.c @@ -18,23 +18,125 @@ // to complie success: FileIO interface is modified -SRaftStore *raftStoreOpen(const char *path) { return NULL;} +SRaftStore *raftStoreOpen(const char *path) { + int32_t ret; -static int32_t raftStoreInit(SRaftStore *pRaftStore) { return 0;} + SRaftStore *pRaftStore = malloc(sizeof(SRaftStore)); + if (pRaftStore == NULL) { + sError("raftStoreOpen malloc error"); + return NULL; + } + memset(pRaftStore, 0, sizeof(*pRaftStore)); + snprintf(pRaftStore->path, sizeof(pRaftStore->path), "%s", path); -int32_t raftStoreClose(SRaftStore *pRaftStore) { return 0;} + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + memset(storeBuf, 0, sizeof(storeBuf)); -int32_t raftStorePersist(SRaftStore *pRaftStore) { return 0;} + if (!raftStoreFileExist(pRaftStore->path)) { + ret = raftStoreInit(pRaftStore); + assert(ret == 0); + } -static bool raftStoreFileExist(char *path) { return 0;} + pRaftStore->pFile = taosOpenFile(path, TD_FILE_READ | TD_FILE_WRITE); + assert(pRaftStore->pFile != NULL); -int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + int len = taosReadFile(pRaftStore->pFile, storeBuf, RAFT_STORE_BLOCK_SIZE); + assert(len == RAFT_STORE_BLOCK_SIZE); -int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { return 0;} + ret = raftStoreDeserialize(pRaftStore, storeBuf, len); + assert(ret == 0); -void raftStorePrint(SRaftStore *pRaftStore) {} + return pRaftStore; +} +static int32_t raftStoreInit(SRaftStore *pRaftStore) { + assert(pRaftStore != NULL); + pRaftStore->pFile = taosOpenFile(pRaftStore->path, TD_FILE_CTEATE | TD_FILE_WRITE); + assert(pRaftStore->pFile != NULL); + + pRaftStore->currentTerm = 0; + pRaftStore->voteFor.addr = 0; + pRaftStore->voteFor.vgId = 0; + + int32_t ret = raftStorePersist(pRaftStore); + assert(ret == 0); + + taosCloseFile(&pRaftStore->pFile); + return 0; +} + +int32_t raftStoreClose(SRaftStore *pRaftStore) { + assert(pRaftStore != NULL); + + taosCloseFile(&pRaftStore->pFile); + free(pRaftStore); + pRaftStore = NULL; + return 0; +} + +int32_t raftStorePersist(SRaftStore *pRaftStore) { + assert(pRaftStore != NULL); + + int32_t ret; + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + ret = raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + assert(ret == 0); + + taosLSeekFile(pRaftStore->pFile, 0, SEEK_SET); + + ret = taosWriteFile(pRaftStore->pFile, storeBuf, sizeof(storeBuf)); + assert(ret == RAFT_STORE_BLOCK_SIZE); + + taosFsyncFile(pRaftStore->pFile); + return 0; +} + +static bool raftStoreFileExist(char *path) { return taosStatFile(path, NULL, NULL) >= 0; } + +int32_t raftStoreSerialize(SRaftStore *pRaftStore, char *buf, size_t len) { + assert(pRaftStore != NULL); + + cJSON *pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "current_term", pRaftStore->currentTerm); + cJSON_AddNumberToObject(pRoot, "vote_for_addr", pRaftStore->voteFor.addr); + cJSON_AddNumberToObject(pRoot, "vote_for_vgid", pRaftStore->voteFor.vgId); + + char *serialized = cJSON_Print(pRoot); + int len2 = strlen(serialized); + assert(len2 < len); + memset(buf, 0, len); + snprintf(buf, len, "%s", serialized); + free(serialized); + + cJSON_Delete(pRoot); + return 0; +} + +int32_t raftStoreDeserialize(SRaftStore *pRaftStore, char *buf, size_t len) { + assert(pRaftStore != NULL); + + assert(len > 0 && len <= RAFT_STORE_BLOCK_SIZE); + cJSON *pRoot = cJSON_Parse(buf); + + cJSON *pCurrentTerm = cJSON_GetObjectItem(pRoot, "current_term"); + pRaftStore->currentTerm = pCurrentTerm->valueint; + + cJSON *pVoteForAddr = cJSON_GetObjectItem(pRoot, "vote_for_addr"); + pRaftStore->voteFor.addr = pVoteForAddr->valueint; + + cJSON *pVoteForVgid = cJSON_GetObjectItem(pRoot, "vote_for_vgid"); + pRaftStore->voteFor.vgId = pVoteForVgid->valueint; + + cJSON_Delete(pRoot); + return 0; +} + +void raftStorePrint(SRaftStore *pRaftStore) { + char storeBuf[RAFT_STORE_BLOCK_SIZE]; + raftStoreSerialize(pRaftStore, storeBuf, sizeof(storeBuf)); + printf("%s\n", storeBuf); +} #if 0 diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c new file mode 100644 index 0000000000..b4959a810b --- /dev/null +++ b/source/libs/sync/src/syncUtil.c @@ -0,0 +1,90 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "syncUtil.h" +#include +#include +#include + +// ---- encode / decode +uint64_t syncUtilAddr2U64(const char* host, uint16_t port) { + uint64_t u64; + uint32_t hostU32 = (uint32_t)inet_addr(host); + // assert(hostU32 != (uint32_t)-1); + u64 = (((uint64_t)hostU32) << 32) | (((uint32_t)port) << 16); + return u64; +} + +void syncUtilU642Addr(uint64_t u64, char* host, size_t len, uint16_t* port) { + uint32_t hostU32 = (uint32_t)((u64 >> 32) & 0x00000000FFFFFFFF); + + struct in_addr addr; + addr.s_addr = hostU32; + snprintf(host, len, "%s", inet_ntoa(addr)); + *port = (uint16_t)((u64 & 0x00000000FFFF0000) >> 16); +} + +void syncUtilnodeInfo2EpSet(const SNodeInfo* pNodeInfo, SEpSet* pEpSet) { + pEpSet->inUse = 0; + pEpSet->numOfEps = 0; + addEpIntoEpSet(pEpSet, pNodeInfo->nodeFqdn, pNodeInfo->nodePort); +} + +void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet) { + char host[TSDB_FQDN_LEN]; + uint16_t port; + + syncUtilU642Addr(raftId->addr, host, sizeof(host), &port); + + /* + pEpSet->numOfEps = 1; + pEpSet->inUse = 0; + pEpSet->eps[0].port = port; + snprintf(pEpSet->eps[0].fqdn, sizeof(pEpSet->eps[0].fqdn), "%s", host); + */ + pEpSet->inUse = 0; + pEpSet->numOfEps = 0; + addEpIntoEpSet(pEpSet, host, port); +} + +void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId) { + uint32_t ipv4 = taosGetIpv4FromFqdn(pNodeInfo->nodeFqdn); + assert(ipv4 != 0xFFFFFFFF); + char ipbuf[128]; + tinet_ntoa(ipbuf, ipv4); + raftId->addr = syncUtilAddr2U64(ipbuf, pNodeInfo->nodePort); + raftId->vgId = vgId; +} + +// ---- SSyncBuffer ----- +#if 0 +void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { + syncBuf->len = len; + syncBuf->data = malloc(syncBuf->len); +} + +void syncUtilbufDestroy(SSyncBuffer* syncBuf) { free(syncBuf->data); } + +void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest) { + dest->len = src->len; + dest->data = src->data; +} + +void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { + dest->len = src->len; + dest->data = malloc(dest->len); + memcpy(dest->data, src->data, dest->len); +} +#endif \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index e655ac01be..2913d230b2 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -1,6 +1,13 @@ add_executable(syncTest "") add_executable(syncEnvTest "") add_executable(syncPingTest "") +add_executable(syncEncodeTest "") +add_executable(syncIOTickQTest "") +add_executable(syncIOTickPingTest "") +add_executable(syncIOSendMsgTest "") +add_executable(syncIOSendMsgClientTest "") +add_executable(syncIOSendMsgServerTest "") +add_executable(syncRaftStoreTest "") target_sources(syncTest @@ -15,6 +22,34 @@ target_sources(syncPingTest PRIVATE "syncPingTest.cpp" ) +target_sources(syncEncodeTest + PRIVATE + "syncEncodeTest.cpp" +) +target_sources(syncIOTickQTest + PRIVATE + "syncIOTickQTest.cpp" +) +target_sources(syncIOTickPingTest + PRIVATE + "syncIOTickPingTest.cpp" +) +target_sources(syncIOSendMsgTest + PRIVATE + "syncIOSendMsgTest.cpp" +) +target_sources(syncIOSendMsgClientTest + PRIVATE + "syncIOSendMsgClientTest.cpp" +) +target_sources(syncIOSendMsgServerTest + PRIVATE + "syncIOSendMsgServerTest.cpp" +) +target_sources(syncRaftStoreTest + PRIVATE + "syncRaftStoreTest.cpp" +) target_include_directories(syncTest @@ -32,6 +67,41 @@ target_include_directories(syncPingTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEncodeTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOTickQTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOTickPingTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOSendMsgTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOSendMsgClientTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncIOSendMsgServerTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) +target_include_directories(syncRaftStoreTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -46,6 +116,34 @@ target_link_libraries(syncPingTest sync gtest_main ) +target_link_libraries(syncEncodeTest + sync + gtest_main +) +target_link_libraries(syncIOTickQTest + sync + gtest_main +) +target_link_libraries(syncIOTickPingTest + sync + gtest_main +) +target_link_libraries(syncIOSendMsgTest + sync + gtest_main +) +target_link_libraries(syncIOSendMsgClientTest + sync + gtest_main +) +target_link_libraries(syncIOSendMsgServerTest + sync + gtest_main +) +target_link_libraries(syncRaftStoreTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncEncodeTest.cpp b/source/libs/sync/test/syncEncodeTest.cpp new file mode 100644 index 0000000000..4deceb603e --- /dev/null +++ b/source/libs/sync/test/syncEncodeTest.cpp @@ -0,0 +1,184 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncMessage.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +#define PING_MSG_LEN 20 + +void test1() { + sTrace("test1: ---- syncPingSerialize, syncPingDeserialize"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "test ping"); + SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); + pMsg->srcId.addr = 1; + pMsg->srcId.vgId = 2; + pMsg->destId.addr = 3; + pMsg->destId.vgId = 4; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPing: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncPingSerialize(pMsg, buf, bufLen); + + SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); + syncPingDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPing2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); + free(buf); +} + +void test2() { + sTrace("test2: ---- syncPing2RpcMsg, syncPingFromRpcMsg"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "hello raft"); + SyncPing* pMsg = syncPingBuild(PING_MSG_LEN); + pMsg->srcId.addr = 100; + pMsg->srcId.vgId = 200; + pMsg->destId.addr = 300; + pMsg->destId.vgId = 400; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPing2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPing: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncPing2RpcMsg(pMsg, &rpcMsg); + SyncPing* pMsg2 = (SyncPing*)malloc(pMsg->bytes); + syncPingFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncPing2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPing2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingDestroy(pMsg); + syncPingDestroy(pMsg2); +} + +void test3() { + sTrace("test3: ---- syncPingReplySerialize, syncPingReplyDeserialize"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "test ping"); + SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); + pMsg->srcId.addr = 19; + pMsg->srcId.vgId = 29; + pMsg->destId.addr = 39; + pMsg->destId.vgId = 49; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + uint32_t bufLen = pMsg->bytes; + char* buf = (char*)malloc(bufLen); + syncPingReplySerialize(pMsg, buf, bufLen); + + SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); + syncPingReplyDeserialize(buf, bufLen, pMsg2); + + { + cJSON* pJson = syncPingReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); + free(buf); +} + +void test4() { + sTrace("test4: ---- syncPingReply2RpcMsg, syncPingReplyFromRpcMsg"); + + char msg[PING_MSG_LEN]; + snprintf(msg, sizeof(msg), "%s", "hello raft"); + SyncPingReply* pMsg = syncPingReplyBuild(PING_MSG_LEN); + pMsg->srcId.addr = 66; + pMsg->srcId.vgId = 77; + pMsg->destId.addr = 88; + pMsg->destId.vgId = 99; + memcpy(pMsg->data, msg, PING_MSG_LEN); + + { + cJSON* pJson = syncPingReply2Json(pMsg); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pMsg, &rpcMsg); + SyncPingReply* pMsg2 = (SyncPingReply*)malloc(pMsg->bytes); + syncPingReplyFromRpcMsg(&rpcMsg, pMsg2); + rpcFreeCont(rpcMsg.pCont); + + { + cJSON* pJson = syncPingReply2Json(pMsg2); + char* serialized = cJSON_Print(pJson); + printf("SyncPingReply2: \n%s\n\n", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + syncPingReplyDestroy(pMsg); + syncPingReplyDestroy(pMsg2); +} +int main() { + // taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + test1(); + test2(); + test3(); + test4(); + + return 0; +} diff --git a/source/libs/sync/test/syncEnvTest.cpp b/source/libs/sync/test/syncEnvTest.cpp index 1d050e7094..1ac4357c5a 100644 --- a/source/libs/sync/test/syncEnvTest.cpp +++ b/source/libs/sync/test/syncEnvTest.cpp @@ -34,19 +34,20 @@ void doSync() { } int main() { - //taosInitLog((char*)"syncEnvTest.log", 100000, 10); + // taosInitLog((char*)"syncEnvTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; + int32_t ret; logTest(); - int32_t ret = syncIOStart(); - assert(ret == 0); + // ret = syncIOStart(); + // assert(ret == 0); ret = syncEnvStart(); assert(ret == 0); - doSync(); + // doSync(); while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncIOSendMsgClientTest.cpp b/source/libs/sync/test/syncIOSendMsgClientTest.cpp new file mode 100644 index 0000000000..9e8c7c8b65 --- /dev/null +++ b/source/libs/sync/test/syncIOSendMsgClientTest.cpp @@ -0,0 +1,50 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char *)"127.0.0.1", 7010); + assert(ret == 0); + + for (int i = 0; i < 10; ++i) { + SEpSet epSet; + epSet.inUse = 0; + epSet.numOfEps = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 7030); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 77; + + syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); + sleep(1); + } + + while (1) { + sleep(1); + } + + return 0; +} diff --git a/source/libs/sync/test/syncIOSendMsgServerTest.cpp b/source/libs/sync/test/syncIOSendMsgServerTest.cpp new file mode 100644 index 0000000000..8af9344ed6 --- /dev/null +++ b/source/libs/sync/test/syncIOSendMsgServerTest.cpp @@ -0,0 +1,33 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char *)"127.0.0.1", 7030); + assert(ret == 0); + + while (1) { + sleep(1); + } + + return 0; +} diff --git a/source/libs/sync/test/syncIOSendMsgTest.cpp b/source/libs/sync/test/syncIOSendMsgTest.cpp new file mode 100644 index 0000000000..a297981ee5 --- /dev/null +++ b/source/libs/sync/test/syncIOSendMsgTest.cpp @@ -0,0 +1,50 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char *)"127.0.0.1", 7010); + assert(ret == 0); + + for (int i = 0; i < 10; ++i) { + SEpSet epSet; + epSet.inUse = 0; + epSet.numOfEps = 0; + addEpIntoEpSet(&epSet, "127.0.0.1", 7010); + + SRpcMsg rpcMsg; + rpcMsg.contLen = 64; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + snprintf((char *)rpcMsg.pCont, rpcMsg.contLen, "%s", "syncIOSendMsgTest"); + rpcMsg.handle = NULL; + rpcMsg.msgType = 77; + + syncIOSendMsg(gSyncIO->clientRpc, &epSet, &rpcMsg); + sleep(1); + } + + while (1) { + sleep(1); + } + + return 0; +} diff --git a/source/libs/sync/test/syncIOTickPingTest.cpp b/source/libs/sync/test/syncIOTickPingTest.cpp new file mode 100644 index 0000000000..1559b57585 --- /dev/null +++ b/source/libs/sync/test/syncIOTickPingTest.cpp @@ -0,0 +1,35 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char*)"127.0.0.1", 7010); + assert(ret == 0); + + ret = syncIOTickPing(); + assert(ret == 0); + + while (1) { + sleep(1); + } + return 0; +} diff --git a/source/libs/sync/test/syncIOTickQTest.cpp b/source/libs/sync/test/syncIOTickQTest.cpp new file mode 100644 index 0000000000..90304079e3 --- /dev/null +++ b/source/libs/sync/test/syncIOTickQTest.cpp @@ -0,0 +1,35 @@ +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int32_t ret; + + ret = syncIOStart((char*)"127.0.0.1", 7010); + assert(ret == 0); + + ret = syncIOTickQ(); + assert(ret == 0); + + while (1) { + sleep(1); + } + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index e62d051946..24d9ead5e3 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -13,49 +13,71 @@ void logTest() { sFatal("--- sync log test: fatal"); } -void doSync() { +SSyncNode* doSync() { SSyncFSM* pFsm; SSyncInfo syncInfo; syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); SSyncCfg* pCfg = &syncInfo.syncCfg; pCfg->myIndex = 0; - pCfg->replicaNum = 3; + pCfg->replicaNum = 2; pCfg->nodeInfo[0].nodePort = 7010; - taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); pCfg->nodeInfo[1].nodePort = 7110; - taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); pCfg->nodeInfo[2].nodePort = 7210; - taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); assert(pSyncNode != NULL); - gSyncIO->FpOnPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +void timerPingAll(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + syncNodePingAll(pSyncNode); } int main() { - //taosInitLog((char*)"syncPingTest.log", 100000, 10); + // taosInitLog((char*)"syncPingTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64; logTest(); - int32_t ret = syncIOStart(); + int32_t ret = syncIOStart((char*)"127.0.0.1", 7010); assert(ret == 0); ret = syncEnvStart(); assert(ret == 0); - doSync(); + SSyncNode* pSyncNode = doSync(); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + + ret = syncNodeStartPingTimer(pSyncNode); + assert(ret == 0); + + /* + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); + */ while (1) { taosMsleep(1000); diff --git a/source/libs/sync/test/syncRaftStoreTest.cpp b/source/libs/sync/test/syncRaftStoreTest.cpp new file mode 100644 index 0000000000..e533b89a92 --- /dev/null +++ b/source/libs/sync/test/syncRaftStoreTest.cpp @@ -0,0 +1,42 @@ +#include "syncRaftStore.h" +#include +#include "gtest/gtest.h" +#include "syncIO.h" +#include "syncInt.h" + +void *pingFunc(void *param) { + SSyncIO *io = (SSyncIO *)param; + while (1) { + sDebug("io->ping"); + // io->ping(io); + sleep(1); + } + return NULL; +} + +int main() { + // taosInitLog((char *)"syncTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + sTrace("sync log test: trace"); + sDebug("sync log test: debug"); + sInfo("sync log test: info"); + sWarn("sync log test: warn"); + sError("sync log test: error"); + sFatal("sync log test: fatal"); + + SRaftStore *pRaftStore = raftStoreOpen("./raft_store.json"); + assert(pRaftStore != NULL); + + raftStorePrint(pRaftStore); + + pRaftStore->currentTerm = 100; + pRaftStore->voteFor.addr = 200; + pRaftStore->voteFor.vgId = 300; + + raftStorePrint(pRaftStore); + raftStorePersist(pRaftStore); + + return 0; +} diff --git a/source/libs/sync/test/syncTest.cpp b/source/libs/sync/test/syncTest.cpp index 0f72fd822f..0b397ef921 100644 --- a/source/libs/sync/test/syncTest.cpp +++ b/source/libs/sync/test/syncTest.cpp @@ -1,21 +1,21 @@ #include +#include "gtest/gtest.h" #include "syncIO.h" #include "syncInt.h" #include "syncRaftStore.h" -#include "gtest/gtest.h" void *pingFunc(void *param) { SSyncIO *io = (SSyncIO *)param; while (1) { sDebug("io->ping"); - io->ping(io); + // io->ping(io); sleep(1); } return NULL; } int main() { - //taosInitLog((char *)"syncTest.log", 100000, 10); + // taosInitLog((char *)"syncTest.log", 100000, 10); tsAsyncLog = 0; sDebugFlag = 143 + 64;