Merge pull request #14513 from taosdata/feature/3.0_mhli
refactor(sync): add sync strategy
This commit is contained in:
commit
c45e0d5112
|
@ -31,6 +31,12 @@ extern bool gRaftDetailLog;
|
||||||
#define SYNC_INDEX_INVALID -1
|
#define SYNC_INDEX_INVALID -1
|
||||||
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
|
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
SYNC_STRATEGY_NO_SNAPSHOT = 0,
|
||||||
|
SYNC_STRATEGY_STANDARD_SNAPSHOT = 1,
|
||||||
|
SYNC_STRATEGY_WAL_FIRST = 2,
|
||||||
|
} ESyncStrategy;
|
||||||
|
|
||||||
typedef uint64_t SyncNodeId;
|
typedef uint64_t SyncNodeId;
|
||||||
typedef int32_t SyncGroupId;
|
typedef int32_t SyncGroupId;
|
||||||
typedef int64_t SyncIndex;
|
typedef int64_t SyncIndex;
|
||||||
|
@ -48,11 +54,6 @@ typedef enum {
|
||||||
TAOS_SYNC_STATE_ERROR = 103,
|
TAOS_SYNC_STATE_ERROR = 103,
|
||||||
} ESyncState;
|
} ESyncState;
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
TAOS_SYNC_FSM_CB_SUCCESS = 0,
|
|
||||||
TAOS_SYNC_FSM_CB_OTHER_ERROR = 1,
|
|
||||||
} ESyncFsmCbCode;
|
|
||||||
|
|
||||||
typedef struct SNodeInfo {
|
typedef struct SNodeInfo {
|
||||||
uint16_t nodePort;
|
uint16_t nodePort;
|
||||||
char nodeFqdn[TSDB_FQDN_LEN];
|
char nodeFqdn[TSDB_FQDN_LEN];
|
||||||
|
@ -96,6 +97,11 @@ typedef struct SReConfigCbMeta {
|
||||||
|
|
||||||
} SReConfigCbMeta;
|
} SReConfigCbMeta;
|
||||||
|
|
||||||
|
typedef struct SSnapshotParam {
|
||||||
|
SyncIndex start;
|
||||||
|
SyncIndex end;
|
||||||
|
} SSnapshotParam;
|
||||||
|
|
||||||
typedef struct SSnapshot {
|
typedef struct SSnapshot {
|
||||||
void* data;
|
void* data;
|
||||||
SyncIndex lastApplyIndex;
|
SyncIndex lastApplyIndex;
|
||||||
|
@ -125,7 +131,7 @@ typedef struct SSyncFSM {
|
||||||
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
|
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
|
||||||
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
|
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
|
||||||
|
|
||||||
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void** ppWriter);
|
int32_t (*FpSnapshotStartWrite)(struct SSyncFSM* pFsm, void* pWriterParam, void** ppWriter);
|
||||||
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
|
int32_t (*FpSnapshotStopWrite)(struct SSyncFSM* pFsm, void* pWriter, bool isApply);
|
||||||
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
|
int32_t (*FpSnapshotDoWrite)(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len);
|
||||||
|
|
||||||
|
@ -179,7 +185,7 @@ typedef struct SSyncLogStore {
|
||||||
|
|
||||||
typedef struct SSyncInfo {
|
typedef struct SSyncInfo {
|
||||||
bool isStandBy;
|
bool isStandBy;
|
||||||
bool snapshotEnable;
|
ESyncStrategy snapshotStrategy;
|
||||||
SyncGroupId vgId;
|
SyncGroupId vgId;
|
||||||
int32_t batchSize;
|
int32_t batchSize;
|
||||||
SSyncCfg syncCfg;
|
SSyncCfg syncCfg;
|
||||||
|
@ -205,7 +211,7 @@ SyncGroupId syncGetVgId(int64_t rid);
|
||||||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||||
// int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
|
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
|
||||||
bool syncEnvIsStart();
|
bool syncEnvIsStart();
|
||||||
const char* syncStr(ESyncState state);
|
const char* syncStr(ESyncState state);
|
||||||
bool syncIsRestoreFinish(int64_t rid);
|
bool syncIsRestoreFinish(int64_t rid);
|
||||||
|
|
|
@ -191,12 +191,12 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
|
||||||
typedef struct SyncClientRequest {
|
typedef struct SyncClientRequest {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
uint32_t msgType; // SyncClientRequest msgType
|
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST
|
||||||
uint32_t originalRpcType; // user RpcMsg msgType
|
uint32_t originalRpcType; // origin RpcMsg msgType
|
||||||
uint64_t seqNum;
|
uint64_t seqNum;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
uint32_t dataLen; // user RpcMsg.contLen
|
uint32_t dataLen; // origin RpcMsg.contLen
|
||||||
char data[]; // user RpcMsg.pCont
|
char data[]; // origin RpcMsg.pCont
|
||||||
} SyncClientRequest;
|
} SyncClientRequest;
|
||||||
|
|
||||||
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
|
SyncClientRequest* syncClientRequestBuild(uint32_t dataLen);
|
||||||
|
@ -220,11 +220,6 @@ void syncClientRequestLog(const SyncClientRequest* pMsg);
|
||||||
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
|
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
typedef struct SOffsetAndContLen {
|
|
||||||
int32_t offset;
|
|
||||||
int32_t contLen;
|
|
||||||
} SOffsetAndContLen;
|
|
||||||
|
|
||||||
typedef struct SRaftMeta {
|
typedef struct SRaftMeta {
|
||||||
uint64_t seqNum;
|
uint64_t seqNum;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
|
@ -232,20 +227,33 @@ typedef struct SRaftMeta {
|
||||||
|
|
||||||
// block1:
|
// block1:
|
||||||
// block2: SRaftMeta array
|
// block2: SRaftMeta array
|
||||||
// block3: rpc msg array (with pCont)
|
// block3: rpc msg array (with pCont pointer)
|
||||||
|
|
||||||
typedef struct SyncClientRequestBatch {
|
typedef struct SyncClientRequestBatch {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
uint32_t msgType; // SyncClientRequestBatch msgType
|
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH
|
||||||
uint32_t dataCount;
|
uint32_t dataCount;
|
||||||
uint32_t dataLen; // user RpcMsg.contLen
|
uint32_t dataLen;
|
||||||
char data[]; // user RpcMsg.pCont
|
char data[]; // block2, block3
|
||||||
} SyncClientRequestBatch;
|
} SyncClientRequestBatch;
|
||||||
|
|
||||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||||
int32_t vgId);
|
int32_t vgId);
|
||||||
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
|
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
|
||||||
|
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
|
||||||
|
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg);
|
||||||
|
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg);
|
||||||
|
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg);
|
||||||
|
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg);
|
||||||
|
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg);
|
||||||
|
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg);
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg);
|
||||||
|
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg);
|
||||||
|
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg);
|
||||||
|
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg);
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
typedef struct SyncClientRequestReply {
|
typedef struct SyncClientRequestReply {
|
||||||
|
@ -318,12 +326,15 @@ void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
|
||||||
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
|
void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
// data: entry
|
||||||
|
|
||||||
typedef struct SyncAppendEntries {
|
typedef struct SyncAppendEntries {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
int32_t vgId;
|
int32_t vgId;
|
||||||
uint32_t msgType;
|
uint32_t msgType;
|
||||||
SRaftId srcId;
|
SRaftId srcId;
|
||||||
SRaftId destId;
|
SRaftId destId;
|
||||||
|
|
||||||
// private data
|
// private data
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncIndex prevLogIndex;
|
SyncIndex prevLogIndex;
|
||||||
|
@ -354,18 +365,14 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
|
||||||
// define ahead
|
|
||||||
/*
|
|
||||||
typedef struct SOffsetAndContLen {
|
typedef struct SOffsetAndContLen {
|
||||||
int32_t offset;
|
int32_t offset;
|
||||||
int32_t contLen;
|
int32_t contLen;
|
||||||
} SOffsetAndContLen;
|
} SOffsetAndContLen;
|
||||||
*/
|
|
||||||
|
|
||||||
// block1: SOffsetAndContLen
|
// data:
|
||||||
// block2: SOffsetAndContLen Array
|
// block1: SOffsetAndContLen Array
|
||||||
// block3: SRpcMsg Array
|
// block2: entry Array
|
||||||
// block4: SRpcMsg pCont Array
|
|
||||||
|
|
||||||
typedef struct SyncAppendEntriesBatch {
|
typedef struct SyncAppendEntriesBatch {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
|
@ -382,10 +389,11 @@ typedef struct SyncAppendEntriesBatch {
|
||||||
SyncTerm privateTerm;
|
SyncTerm privateTerm;
|
||||||
int32_t dataCount;
|
int32_t dataCount;
|
||||||
uint32_t dataLen;
|
uint32_t dataLen;
|
||||||
char data[];
|
char data[]; // block1, block2
|
||||||
} SyncAppendEntriesBatch;
|
} SyncAppendEntriesBatch;
|
||||||
|
|
||||||
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId);
|
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId);
|
||||||
|
SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg);
|
||||||
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
|
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
|
||||||
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
|
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
|
||||||
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
|
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
|
||||||
|
@ -396,8 +404,6 @@ void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg,
|
||||||
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||||
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
|
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
|
||||||
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
|
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
|
||||||
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
|
|
||||||
int32_t* pRetArrSize);
|
|
||||||
|
|
||||||
// for debug ----------------------
|
// for debug ----------------------
|
||||||
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
|
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
|
||||||
|
@ -477,9 +483,10 @@ typedef struct SyncSnapshotSend {
|
||||||
SRaftId destId;
|
SRaftId destId;
|
||||||
|
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncIndex lastIndex; // lastIndex of snapshot
|
SyncIndex beginIndex; // snapshot.beginIndex
|
||||||
SyncTerm lastTerm; // lastTerm of snapshot
|
SyncIndex lastIndex; // snapshot.lastIndex
|
||||||
SyncIndex lastConfigIndex;
|
SyncTerm lastTerm; // snapshot.lastTerm
|
||||||
|
SyncIndex lastConfigIndex; // snapshot.lastConfigIndex
|
||||||
SSyncCfg lastConfig;
|
SSyncCfg lastConfig;
|
||||||
SyncTerm privateTerm;
|
SyncTerm privateTerm;
|
||||||
int32_t seq;
|
int32_t seq;
|
||||||
|
@ -617,6 +624,9 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl
|
||||||
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||||
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
|
int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg);
|
||||||
|
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
int32_t syncNodeOnSnapshotSendCb(SSyncNode* ths, SyncSnapshotSend* pMsg);
|
||||||
int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||||
|
|
||||||
|
@ -634,6 +644,7 @@ typedef int32_t (*FpOnSnapshotRspCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg);
|
||||||
|
|
||||||
// option ----------------------------------
|
// option ----------------------------------
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
|
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
// ---------------------------------------------
|
// ---------------------------------------------
|
||||||
|
|
||||||
|
|
|
@ -428,6 +428,7 @@ int32_t* taosGetErrno();
|
||||||
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
|
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
|
||||||
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
|
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
|
||||||
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
|
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
|
||||||
|
#define TSDB_CODE_SYN_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x0914)
|
||||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||||
|
|
||||||
// tq
|
// tq
|
||||||
|
|
|
@ -58,7 +58,7 @@ static void *mndBuildTimerMsg(int32_t *pContLen) {
|
||||||
|
|
||||||
static void mndPullupTrans(SMnode *pMnode) {
|
static void mndPullupTrans(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void * pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TRANS_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
|
@ -67,14 +67,14 @@ static void mndPullupTrans(SMnode *pMnode) {
|
||||||
|
|
||||||
static void mndTtlTimer(SMnode *pMnode) {
|
static void mndTtlTimer(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void * pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TTL_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, WRITE_QUEUE, &rpcMsg);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void mndCalMqRebalance(SMnode *pMnode) {
|
static void mndCalMqRebalance(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void * pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_MQ_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
|
@ -83,7 +83,7 @@ static void mndCalMqRebalance(SMnode *pMnode) {
|
||||||
|
|
||||||
static void mndPullupTelem(SMnode *pMnode) {
|
static void mndPullupTelem(SMnode *pMnode) {
|
||||||
int32_t contLen = 0;
|
int32_t contLen = 0;
|
||||||
void * pReq = mndBuildTimerMsg(&contLen);
|
void *pReq = mndBuildTimerMsg(&contLen);
|
||||||
if (pReq != NULL) {
|
if (pReq != NULL) {
|
||||||
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
SRpcMsg rpcMsg = {.msgType = TDMT_MND_TELEM_TIMER, .pCont = pReq, .contLen = contLen};
|
||||||
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
tmsgPutToQueue(&pMnode->msgCb, READ_QUEUE, &rpcMsg);
|
||||||
|
@ -395,7 +395,7 @@ void mndStop(SMnode *pMnode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
SMnode * pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
SSyncMgmt *pMgmt = &pMnode->syncMgmt;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
|
|
||||||
|
@ -413,7 +413,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
do {
|
do {
|
||||||
char * syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
char *syncNodeStr = sync2SimpleStr(pMgmt->sync);
|
||||||
static int64_t mndTick = 0;
|
static int64_t mndTick = 0;
|
||||||
if (++mndTick % 10 == 1) {
|
if (++mndTick % 10 == 1) {
|
||||||
mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
mTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pMgmt->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||||
|
@ -427,7 +427,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// ToDo: ugly! use function pointer
|
// ToDo: ugly! use function pointer
|
||||||
if (syncNodeSnapshotEnable(pSyncNode)) {
|
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_STANDARD_SNAPSHOT) {
|
||||||
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
|
||||||
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||||
|
@ -579,7 +579,7 @@ static int32_t mndCheckMsgContent(SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
int32_t mndProcessRpcMsg(SRpcMsg *pMsg) {
|
||||||
SMnode * pMnode = pMsg->info.node;
|
SMnode *pMnode = pMsg->info.node;
|
||||||
const STraceId *trace = &pMsg->info.traceId;
|
const STraceId *trace = &pMsg->info.traceId;
|
||||||
|
|
||||||
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(pMsg->msgType)];
|
||||||
|
@ -632,7 +632,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
|
SMonStbInfo *pStbInfo, SMonGrantInfo *pGrantInfo) {
|
||||||
if (mndAcquireRpcRef(pMnode) != 0) return -1;
|
if (mndAcquireRpcRef(pMnode) != 0) return -1;
|
||||||
|
|
||||||
SSdb * pSdb = pMnode->pSdb;
|
SSdb *pSdb = pMnode->pSdb;
|
||||||
int64_t ms = taosGetTimestampMs();
|
int64_t ms = taosGetTimestampMs();
|
||||||
|
|
||||||
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
|
pClusterInfo->dnodes = taosArrayInit(sdbGetSize(pSdb, SDB_DNODE), sizeof(SMonDnodeDesc));
|
||||||
|
@ -713,7 +713,7 @@ int32_t mndGetMonitorInfo(SMnode *pMnode, SMonClusterInfo *pClusterInfo, SMonVgr
|
||||||
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
|
pGrantInfo->timeseries_used += pVgroup->numOfTimeSeries;
|
||||||
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
|
tstrncpy(desc.status, "unsynced", sizeof(desc.status));
|
||||||
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
for (int32_t i = 0; i < pVgroup->replica; ++i) {
|
||||||
SVnodeGid * pVgid = &pVgroup->vnodeGid[i];
|
SVnodeGid *pVgid = &pVgroup->vnodeGid[i];
|
||||||
SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
|
SMonVnodeDesc *pVnDesc = &desc.vnodes[i];
|
||||||
pVnDesc->dnode_id = pVgid->dnodeId;
|
pVnDesc->dnode_id = pVgid->dnodeId;
|
||||||
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
|
tstrncpy(pVnDesc->vnode_role, syncStr(pVgid->role), sizeof(pVnDesc->vnode_role));
|
||||||
|
|
|
@ -134,7 +134,7 @@ int32_t mndSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, in
|
||||||
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
|
return sdbDoRead(pMnode->pSdb, pReader, ppBuf, len);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) {
|
int32_t mndSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) {
|
||||||
mInfo("start to apply snapshot to sdb");
|
mInfo("start to apply snapshot to sdb");
|
||||||
SMnode *pMnode = pFsm->data;
|
SMnode *pMnode = pFsm->data;
|
||||||
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
|
return sdbStartWrite(pMnode->pSdb, (SSdbIter **)ppWriter);
|
||||||
|
@ -178,7 +178,7 @@ int32_t mndInitSync(SMnode *pMnode) {
|
||||||
syncInfo.pWal = pMnode->pWal;
|
syncInfo.pWal = pMnode->pWal;
|
||||||
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
syncInfo.pFsm = mndSyncMakeFsm(pMnode);
|
||||||
syncInfo.isStandBy = pMgmt->standby;
|
syncInfo.isStandBy = pMgmt->standby;
|
||||||
syncInfo.snapshotEnable = true;
|
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
|
||||||
|
|
||||||
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
|
mInfo("start to open mnode sync, standby:%d", pMgmt->standby);
|
||||||
if (pMgmt->standby || pMgmt->replica.id > 0) {
|
if (pMgmt->standby || pMgmt->replica.id > 0) {
|
||||||
|
|
|
@ -256,59 +256,54 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
SRpcMsg *pRpcMsg = pMsg;
|
SRpcMsg *pRpcMsg = pMsg;
|
||||||
|
|
||||||
|
// ToDo: ugly! use function pointer
|
||||||
|
// use different strategy
|
||||||
|
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
|
||||||
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||||
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||||
syncTimeoutDestroy(pSyncMsg);
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||||
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||||
syncPingDestroy(pSyncMsg);
|
syncPingDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||||
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncPingReplyDestroy(pSyncMsg);
|
syncPingReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||||
syncClientRequestDestroy(pSyncMsg);
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||||
syncRequestVoteDestroy(pSyncMsg);
|
syncRequestVoteDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||||
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncRequestVoteReplyDestroy(pSyncMsg);
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) {
|
||||||
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnAppendEntriesCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesDestroy(pSyncMsg);
|
syncAppendEntriesDestroy(pSyncMsg);
|
||||||
|
|
||||||
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||||
assert(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
||||||
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
ret = syncNodeOnAppendEntriesReplyCb(pSyncNode, pSyncMsg);
|
||||||
syncAppendEntriesReplyDestroy(pSyncMsg);
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
@ -322,6 +317,74 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
ret = -1;
|
ret = -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
} else {
|
||||||
|
// use wal first strategy
|
||||||
|
|
||||||
|
if (pRpcMsg->msgType == TDMT_SYNC_TIMEOUT) {
|
||||||
|
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg);
|
||||||
|
syncTimeoutDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING) {
|
||||||
|
SyncPing *pSyncMsg = syncPingFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnPingCb(pSyncNode, pSyncMsg);
|
||||||
|
syncPingDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_PING_REPLY) {
|
||||||
|
SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncPingReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
|
||||||
|
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
|
||||||
|
syncClientRequestDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
|
||||||
|
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
||||||
|
syncClientRequestBatchDestroyDeep(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||||
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnRequestVoteCb(pSyncNode, pSyncMsg);
|
||||||
|
syncRequestVoteDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) {
|
||||||
|
SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnRequestVoteReplyCb(pSyncNode, pSyncMsg);
|
||||||
|
syncRequestVoteReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_BATCH) {
|
||||||
|
SyncAppendEntriesBatch *pSyncMsg = syncAppendEntriesBatchFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnAppendEntriesSnapshot2Cb(pSyncNode, pSyncMsg);
|
||||||
|
syncAppendEntriesBatchDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) {
|
||||||
|
SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pRpcMsg);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
ret = syncNodeOnAppendEntriesReplySnapshot2Cb(pSyncNode, pSyncMsg);
|
||||||
|
syncAppendEntriesReplyDestroy(pSyncMsg);
|
||||||
|
|
||||||
|
} else if (pRpcMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
|
||||||
|
ret = vnodeSetStandBy(pVnode);
|
||||||
|
if (ret != 0 && terrno != 0) ret = terrno;
|
||||||
|
SRpcMsg rsp = {.code = ret, .info = pMsg->info};
|
||||||
|
tmsgSendRsp(&rsp);
|
||||||
|
} else {
|
||||||
|
vError("==vnodeProcessSyncReq== error msg type:%d", pRpcMsg->msgType);
|
||||||
|
ret = -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
syncNodeRelease(pSyncNode);
|
syncNodeRelease(pSyncNode);
|
||||||
} else {
|
} else {
|
||||||
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
vError("==vnodeProcessSyncReq== error syncEnv stop");
|
||||||
|
@ -415,7 +478,7 @@ static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { ret
|
||||||
|
|
||||||
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
|
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; }
|
||||||
|
|
||||||
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void **ppWriter) { return 0; }
|
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; }
|
||||||
|
|
||||||
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
|
static int32_t vnodeSnapshotStopWrite(struct SSyncFSM *pFsm, void *pWriter, bool isApply) { return 0; }
|
||||||
|
|
||||||
|
@ -442,7 +505,8 @@ static SSyncFSM *vnodeSyncMakeFsm(SVnode *pVnode) {
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
int32_t vnodeSyncOpen(SVnode *pVnode, char *path) {
|
||||||
SSyncInfo syncInfo = {
|
SSyncInfo syncInfo = {
|
||||||
.snapshotEnable = false,
|
.snapshotStrategy = SYNC_STRATEGY_NO_SNAPSHOT,
|
||||||
|
.batchSize = 10,
|
||||||
.vgId = pVnode->config.vgId,
|
.vgId = pVnode->config.vgId,
|
||||||
.isStandBy = pVnode->config.standby,
|
.isStandBy = pVnode->config.standby,
|
||||||
.syncCfg = pVnode->config.syncCfg,
|
.syncCfg = pVnode->config.syncCfg,
|
||||||
|
|
|
@ -44,11 +44,6 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
||||||
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||||
|
|
||||||
typedef struct SReaderParam {
|
|
||||||
SyncIndex start;
|
|
||||||
SyncIndex end;
|
|
||||||
} SReaderParam;
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -175,6 +175,7 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pI
|
||||||
|
|
||||||
// option
|
// option
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||||
|
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
|
||||||
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
|
SyncIndex syncNodeGetSnapshotConfigIndex(SSyncNode* pSyncNode, SyncIndex snapshotLastApplyIndex);
|
||||||
|
|
||||||
// ping --------------
|
// ping --------------
|
||||||
|
|
|
@ -29,19 +29,20 @@ extern "C" {
|
||||||
|
|
||||||
typedef struct SSyncRaftEntry {
|
typedef struct SSyncRaftEntry {
|
||||||
uint32_t bytes;
|
uint32_t bytes;
|
||||||
uint32_t msgType; // SyncClientRequest msgType
|
uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST
|
||||||
uint32_t originalRpcType; // user RpcMsg msgType
|
uint32_t originalRpcType; // origin RpcMsg msgType
|
||||||
uint64_t seqNum;
|
uint64_t seqNum;
|
||||||
bool isWeak;
|
bool isWeak;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncIndex index;
|
SyncIndex index;
|
||||||
uint32_t dataLen; // user RpcMsg.contLen
|
uint32_t dataLen; // origin RpcMsg.contLen
|
||||||
char data[]; // user RpcMsg.pCont
|
char data[]; // origin RpcMsg.pCont
|
||||||
} SSyncRaftEntry;
|
} SSyncRaftEntry;
|
||||||
|
|
||||||
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
|
SSyncRaftEntry* syncEntryBuild(uint32_t dataLen);
|
||||||
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
|
SSyncRaftEntry* syncEntryBuild2(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index); // step 4
|
||||||
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
|
SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncIndex index);
|
||||||
|
SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index);
|
||||||
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId);
|
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId);
|
||||||
void syncEntryDestory(SSyncRaftEntry* pEntry);
|
void syncEntryDestory(SSyncRaftEntry* pEntry);
|
||||||
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
|
char* syncEntrySerialize(const SSyncRaftEntry* pEntry, uint32_t* len); // step 5
|
||||||
|
|
|
@ -54,6 +54,7 @@ extern "C" {
|
||||||
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
|
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode);
|
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode);
|
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode);
|
||||||
|
|
||||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||||
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
|
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
|
||||||
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
|
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
|
||||||
|
|
|
@ -43,6 +43,7 @@ typedef struct SSyncSnapshotSender {
|
||||||
void *pReader;
|
void *pReader;
|
||||||
void *pCurrentBlock;
|
void *pCurrentBlock;
|
||||||
int32_t blockLen;
|
int32_t blockLen;
|
||||||
|
SSnapshotParam snapshotParam;
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
SSyncCfg lastConfig;
|
SSyncCfg lastConfig;
|
||||||
int64_t sendingMS;
|
int64_t sendingMS;
|
||||||
|
@ -56,7 +57,8 @@ typedef struct SSyncSnapshotSender {
|
||||||
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
|
SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaIndex);
|
||||||
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
|
void snapshotSenderDestroy(SSyncSnapshotSender *pSender);
|
||||||
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
|
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender);
|
||||||
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader);
|
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
|
||||||
|
void *pReader);
|
||||||
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
|
int32_t snapshotSenderStop(SSyncSnapshotSender *pSender, bool finish);
|
||||||
int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
||||||
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
||||||
|
@ -72,6 +74,7 @@ typedef struct SSyncSnapshotReceiver {
|
||||||
void *pWriter;
|
void *pWriter;
|
||||||
SyncTerm term;
|
SyncTerm term;
|
||||||
SyncTerm privateTerm;
|
SyncTerm privateTerm;
|
||||||
|
SSnapshotParam snapshotParam;
|
||||||
SSnapshot snapshot;
|
SSnapshot snapshot;
|
||||||
SRaftId fromId;
|
SRaftId fromId;
|
||||||
SSyncNode *pSyncNode;
|
SSyncNode *pSyncNode;
|
||||||
|
|
|
@ -628,8 +628,6 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
static int32_t syncNodeMakeLogSame2(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; }
|
|
||||||
|
|
||||||
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
int32_t code;
|
int32_t code;
|
||||||
|
|
||||||
|
@ -675,6 +673,51 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) {
|
||||||
|
int32_t code;
|
||||||
|
|
||||||
|
SyncIndex delBegin = FromIndex;
|
||||||
|
SyncIndex delEnd = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||||
|
|
||||||
|
// invert roll back!
|
||||||
|
for (SyncIndex index = delEnd; index >= delBegin; --index) {
|
||||||
|
if (ths->pFsm->FpRollBackCb != NULL) {
|
||||||
|
SSyncRaftEntry* pRollBackEntry;
|
||||||
|
code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, index, &pRollBackEntry);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
ASSERT(pRollBackEntry != NULL);
|
||||||
|
|
||||||
|
if (syncUtilUserRollback(pRollBackEntry->msgType)) {
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncEntry2OriginalRpc(pRollBackEntry, &rpcMsg);
|
||||||
|
|
||||||
|
SFsmCbMeta cbMeta = {0};
|
||||||
|
cbMeta.index = pRollBackEntry->index;
|
||||||
|
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
|
||||||
|
cbMeta.isWeak = pRollBackEntry->isWeak;
|
||||||
|
cbMeta.code = 0;
|
||||||
|
cbMeta.state = ths->state;
|
||||||
|
cbMeta.seqNum = pRollBackEntry->seqNum;
|
||||||
|
ths->pFsm->FpRollBackCb(ths->pFsm, &rpcMsg, cbMeta);
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
}
|
||||||
|
|
||||||
|
syncEntryDestory(pRollBackEntry);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete confict entries
|
||||||
|
code = ths->pLogStore->syncLogTruncate(ths->pLogStore, delBegin);
|
||||||
|
ASSERT(code == 0);
|
||||||
|
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "log truncate, from %ld to %ld", delBegin, delEnd);
|
||||||
|
syncNodeEventLog(ths, eventLog);
|
||||||
|
logStoreSimpleLog2("after syncNodeMakeLogSame", ths->pLogStore);
|
||||||
|
|
||||||
|
return code;
|
||||||
|
}
|
||||||
|
|
||||||
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
syncEntry2OriginalRpc(pEntry, &rpcMsg);
|
||||||
|
@ -694,6 +737,31 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool syncNodeOnAppendEntriesBatchLogOK(SSyncNode* pSyncNode, SyncAppendEntriesBatch* pMsg) {
|
||||||
|
if (pMsg->prevLogIndex == SYNC_INDEX_INVALID) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncIndex myLastIndex = syncNodeGetLastIndex(pSyncNode);
|
||||||
|
if (pMsg->prevLogIndex > myLastIndex) {
|
||||||
|
sDebug("vgId:%d sync log not ok, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncTerm myPreLogTerm = syncNodeGetPreTerm(pSyncNode, pMsg->prevLogIndex + 1);
|
||||||
|
if (myPreLogTerm == SYNC_TERM_INVALID) {
|
||||||
|
sDebug("vgId:%d sync log not ok2, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (pMsg->prevLogIndex <= myLastIndex && pMsg->prevLogTerm == myPreLogTerm) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
sDebug("vgId:%d sync log not ok3, preindex:%ld", pSyncNode->vgId, pMsg->prevLogIndex);
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
// really pre log match
|
// really pre log match
|
||||||
// prevLogIndex == -1
|
// prevLogIndex == -1
|
||||||
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
|
static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries* pMsg) {
|
||||||
|
@ -767,7 +835,6 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
// operation:
|
// operation:
|
||||||
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
|
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
|
||||||
// match my-commit-index or my-commit-index + 1
|
// match my-commit-index or my-commit-index + 1
|
||||||
// no operation on log
|
|
||||||
do {
|
do {
|
||||||
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
|
||||||
(pMsg->prevLogIndex <= ths->commitIndex);
|
(pMsg->prevLogIndex <= ths->commitIndex);
|
||||||
|
@ -782,12 +849,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
|
|
||||||
SyncIndex matchIndex = ths->commitIndex;
|
SyncIndex matchIndex = ths->commitIndex;
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
|
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
|
||||||
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
|
|
||||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
|
||||||
int32_t retArrSize = 0;
|
|
||||||
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize);
|
|
||||||
|
|
||||||
|
if (hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex) {
|
||||||
// make log same
|
// make log same
|
||||||
do {
|
do {
|
||||||
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
SyncIndex logLastIndex = ths->pLogStore->syncLogLastIndex(ths->pLogStore);
|
||||||
|
@ -795,15 +859,15 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
|
|
||||||
if (hasExtraEntries) {
|
if (hasExtraEntries) {
|
||||||
// make log same, rollback deleted entries
|
// make log same, rollback deleted entries
|
||||||
code = syncNodeMakeLogSame2(ths, pMsg);
|
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// append entry batch
|
// append entry batch
|
||||||
for (int32_t i = 0; i < retArrSize; ++i) {
|
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
|
||||||
SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234);
|
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -821,7 +885,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
walFsync(pWal, true);
|
walFsync(pWal, true);
|
||||||
|
|
||||||
// update match index
|
// update match index
|
||||||
matchIndex = pMsg->prevLogIndex + retArrSize;
|
matchIndex = pMsg->prevLogIndex + pMsg->dataCount;
|
||||||
}
|
}
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
|
@ -839,13 +903,12 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
syncAppendEntriesReplyDestroy(pReply);
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
// calculate logOK here, before will coredump, due to fake match
|
// calculate logOK here, before will coredump, due to fake match
|
||||||
// bool logOK = syncNodeOnAppendEntriesLogOK(ths, pMsg);
|
bool logOK = syncNodeOnAppendEntriesBatchLogOK(ths, pMsg);
|
||||||
bool logOK = true;
|
|
||||||
|
|
||||||
// not match
|
// not match
|
||||||
//
|
//
|
||||||
|
@ -866,8 +929,9 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
|
|
||||||
if (condition) {
|
if (condition) {
|
||||||
char logBuf[128];
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
|
snprintf(logBuf, sizeof(logBuf),
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
"recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex,
|
||||||
|
pMsg->prevLogTerm, pMsg->dataLen);
|
||||||
syncNodeEventLog(ths, logBuf);
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
|
||||||
// prepare response msg
|
// prepare response msg
|
||||||
|
@ -885,7 +949,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
syncNodeSendMsgById(&pReply->destId, ths, &rpcMsg);
|
||||||
syncAppendEntriesReplyDestroy(pReply);
|
syncAppendEntriesReplyDestroy(pReply);
|
||||||
|
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
|
@ -906,27 +970,25 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
|
|
||||||
// has entries in SyncAppendEntries msg
|
// has entries in SyncAppendEntries msg
|
||||||
bool hasAppendEntries = pMsg->dataLen > 0;
|
bool hasAppendEntries = pMsg->dataLen > 0;
|
||||||
|
SOffsetAndContLen* metaTableArr = syncAppendEntriesBatchMetaTableArray(pMsg);
|
||||||
|
|
||||||
|
do {
|
||||||
char logBuf[128];
|
char logBuf[128];
|
||||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d",
|
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries, match, pre-index:%ld, pre-term:%lu, datalen:%d",
|
||||||
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
|
||||||
syncNodeEventLog(ths, logBuf);
|
syncNodeEventLog(ths, logBuf);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
if (hasExtraEntries) {
|
if (hasExtraEntries) {
|
||||||
// make log same, rollback deleted entries
|
// make log same, rollback deleted entries
|
||||||
// code = syncNodeMakeLogSame(ths, pMsg);
|
code = syncNodeDoMakeLogSame(ths, pMsg->prevLogIndex + 1);
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t retArrSize = 0;
|
|
||||||
if (hasAppendEntries) {
|
if (hasAppendEntries) {
|
||||||
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
|
|
||||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
|
||||||
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, SYNC_MAX_BATCH_SIZE, &retArrSize);
|
|
||||||
|
|
||||||
// append entry batch
|
// append entry batch
|
||||||
for (int32_t i = 0; i < retArrSize; ++i) {
|
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
|
||||||
SSyncRaftEntry* pAppendEntry = syncEntryBuild(1234);
|
SSyncRaftEntry* pAppendEntry = (SSyncRaftEntry*)(pMsg->data + metaTableArr[i].offset);
|
||||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pAppendEntry);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -951,7 +1013,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
pReply->term = ths->pRaftStore->currentTerm;
|
pReply->term = ths->pRaftStore->currentTerm;
|
||||||
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
pReply->privateTerm = ths->pNewNodeReceiver->privateTerm;
|
||||||
pReply->success = true;
|
pReply->success = true;
|
||||||
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + retArrSize : pMsg->prevLogIndex;
|
pReply->matchIndex = hasAppendEntries ? pMsg->prevLogIndex + pMsg->dataCount : pMsg->prevLogIndex;
|
||||||
|
|
||||||
// send response
|
// send response
|
||||||
SRpcMsg rpcMsg;
|
SRpcMsg rpcMsg;
|
||||||
|
@ -991,11 +1053,11 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
|
||||||
ASSERT(code == 0);
|
ASSERT(code == 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
||||||
} while (0);
|
} while (0);
|
||||||
|
|
||||||
return ret;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||||
|
|
|
@ -119,11 +119,11 @@ static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncInde
|
||||||
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
|
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
|
||||||
|
|
||||||
void* pReader = NULL;
|
void* pReader = NULL;
|
||||||
SReaderParam readerParam = {.start = beginIndex, .end = endIndex};
|
SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
|
||||||
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
|
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
|
||||||
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
|
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
|
||||||
ASSERT(pReader != NULL);
|
ASSERT(pReader != NULL);
|
||||||
snapshotSenderStart(pSender, snapshot, pReader);
|
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
if (pReader != NULL) {
|
if (pReader != NULL) {
|
||||||
|
@ -165,23 +165,22 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
|
||||||
|
|
||||||
if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) &&
|
if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) &&
|
||||||
ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) {
|
ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) {
|
||||||
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
// update next-index, match-index
|
||||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex);
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex);
|
||||||
|
|
||||||
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
|
|
||||||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
||||||
|
|
||||||
// maybe commit
|
// maybe commit
|
||||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
syncMaybeAdvanceCommitIndex(ths);
|
syncMaybeAdvanceCommitIndex(ths);
|
||||||
}
|
}
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// start snapshot <match+1, old snapshot.end>
|
// start snapshot <match+1, old snapshot.end>
|
||||||
SSnapshot snapshot;
|
SSnapshot oldSnapshot;
|
||||||
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
|
||||||
syncNodeStartSnapshot(ths, newMatchIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pMsg);
|
syncNodeStartSnapshot(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, pMsg);
|
||||||
|
|
||||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1);
|
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1);
|
||||||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -301,7 +300,8 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
|
||||||
!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
|
!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
|
||||||
// has snapshot
|
// has snapshot
|
||||||
ASSERT(pReader != NULL);
|
ASSERT(pReader != NULL);
|
||||||
snapshotSenderStart(pSender, snapshot, pReader);
|
SSnapshotParam readerParam = {.start = 0, .end = snapshot.lastApplyIndex};
|
||||||
|
snapshotSenderStart(pSender, readerParam, snapshot, pReader);
|
||||||
|
|
||||||
} else {
|
} else {
|
||||||
// no snapshot
|
// no snapshot
|
||||||
|
|
|
@ -815,7 +815,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
|
||||||
// create a new raft config file
|
// create a new raft config file
|
||||||
SRaftCfgMeta meta;
|
SRaftCfgMeta meta;
|
||||||
meta.isStandBy = pSyncInfo->isStandBy;
|
meta.isStandBy = pSyncInfo->isStandBy;
|
||||||
meta.snapshotEnable = pSyncInfo->snapshotEnable;
|
meta.snapshotEnable = pSyncInfo->snapshotStrategy;
|
||||||
meta.lastConfigIndex = SYNC_INDEX_INVALID;
|
meta.lastConfigIndex = SYNC_INDEX_INVALID;
|
||||||
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
|
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
@ -1100,7 +1100,9 @@ void syncNodeClose(SSyncNode* pSyncNode) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// option
|
// option
|
||||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
|
// bool syncNodeSnapshotEnable(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
|
||||||
|
|
||||||
|
ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode) { return pSyncNode->pRaftCfg->snapshotEnable; }
|
||||||
|
|
||||||
// ping --------------
|
// ping --------------
|
||||||
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
|
int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
|
||||||
|
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#include "syncMessage.h"
|
#include "syncMessage.h"
|
||||||
#include "syncRaftCfg.h"
|
#include "syncRaftCfg.h"
|
||||||
|
#include "syncRaftEntry.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
|
|
||||||
|
@ -996,7 +997,135 @@ SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMet
|
||||||
return pMsg;
|
return pMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {}
|
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {
|
||||||
|
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||||
|
pRpcMsg->msgType = pSyncMsg->msgType;
|
||||||
|
pRpcMsg->contLen = pSyncMsg->bytes;
|
||||||
|
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||||
|
memcpy(pRpcMsg->pCont, pSyncMsg, pRpcMsg->contLen);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg) {
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
int32_t arrSize = pMsg->dataCount;
|
||||||
|
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
|
||||||
|
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
|
||||||
|
for (int i = 0; i < arrSize; ++i) {
|
||||||
|
if (msgArr[i].pCont != NULL) {
|
||||||
|
rpcFreeCont(msgArr[i].pCont);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
taosMemoryFree(pMsg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg) {
|
||||||
|
SRaftMeta* raftMetaArr = (SRaftMeta*)(pSyncMsg->data);
|
||||||
|
return raftMetaArr;
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg) {
|
||||||
|
int32_t arrSize = pSyncMsg->dataCount;
|
||||||
|
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
|
||||||
|
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pSyncMsg->data) + raftMetaArrayLen);
|
||||||
|
return msgArr;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg) {
|
||||||
|
SyncClientRequestBatch* pSyncMsg = taosMemoryMalloc(pRpcMsg->contLen);
|
||||||
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
memcpy(pSyncMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||||
|
ASSERT(pRpcMsg->contLen == pSyncMsg->bytes);
|
||||||
|
|
||||||
|
return pSyncMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg) {
|
||||||
|
char u64buf[128] = {0};
|
||||||
|
cJSON* pRoot = cJSON_CreateObject();
|
||||||
|
|
||||||
|
if (pMsg != NULL) {
|
||||||
|
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||||
|
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
|
||||||
|
|
||||||
|
SRaftMeta* metaArr = syncClientRequestBatchMetaArr(pMsg);
|
||||||
|
SRpcMsg* msgArr = syncClientRequestBatchRpcMsgArr(pMsg);
|
||||||
|
|
||||||
|
cJSON* pMetaArr = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
|
||||||
|
for (int i = 0; i < pMsg->dataCount; ++i) {
|
||||||
|
cJSON* pMeta = cJSON_CreateObject();
|
||||||
|
cJSON_AddNumberToObject(pMeta, "seqNum", metaArr[i].seqNum);
|
||||||
|
cJSON_AddNumberToObject(pMeta, "isWeak", metaArr[i].isWeak);
|
||||||
|
cJSON_AddItemToArray(pMetaArr, pMeta);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pMsgArr = cJSON_CreateArray();
|
||||||
|
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr);
|
||||||
|
for (int i = 0; i < pMsg->dataCount; ++i) {
|
||||||
|
cJSON* pRpcMsgJson = syncRpcMsg2Json(&msgArr[i]);
|
||||||
|
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
|
||||||
|
}
|
||||||
|
|
||||||
|
char* s;
|
||||||
|
s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen);
|
||||||
|
cJSON_AddStringToObject(pRoot, "data", s);
|
||||||
|
taosMemoryFree(s);
|
||||||
|
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||||
|
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||||
|
taosMemoryFree(s);
|
||||||
|
}
|
||||||
|
|
||||||
|
cJSON* pJson = cJSON_CreateObject();
|
||||||
|
cJSON_AddItemToObject(pJson, "SyncClientRequestBatch", pRoot);
|
||||||
|
return pJson;
|
||||||
|
}
|
||||||
|
|
||||||
|
char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg) {
|
||||||
|
cJSON* pJson = syncClientRequestBatch2Json(pMsg);
|
||||||
|
char* serialized = cJSON_Print(pJson);
|
||||||
|
cJSON_Delete(pJson);
|
||||||
|
return serialized;
|
||||||
|
}
|
||||||
|
|
||||||
|
// for debug ----------------------
|
||||||
|
void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg) {
|
||||||
|
char* serialized = syncClientRequestBatch2Str(pMsg);
|
||||||
|
printf("syncClientRequestBatchPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg) {
|
||||||
|
char* serialized = syncClientRequestBatch2Str(pMsg);
|
||||||
|
printf("syncClientRequestBatchPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||||
|
fflush(NULL);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg) {
|
||||||
|
char* serialized = syncClientRequestBatch2Str(pMsg);
|
||||||
|
sTrace("syncClientRequestBatchLog | len:%lu | %s", strlen(serialized), serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
|
||||||
|
void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg) {
|
||||||
|
if (gRaftDetailLog) {
|
||||||
|
char* serialized = syncClientRequestBatch2Str(pMsg);
|
||||||
|
sTraceLong("syncClientRequestBatchLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ---- message process SyncRequestVote----
|
// ---- message process SyncRequestVote----
|
||||||
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
|
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
|
||||||
|
@ -1472,21 +1601,20 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
|
||||||
|
|
||||||
// block1: SOffsetAndContLen
|
// block1: SOffsetAndContLen
|
||||||
// block2: SOffsetAndContLen Array
|
// block2: SOffsetAndContLen Array
|
||||||
// block3: SRpcMsg Array
|
// block3: entry Array
|
||||||
// block4: SRpcMsg pCont Array
|
|
||||||
|
|
||||||
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) {
|
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId) {
|
||||||
ASSERT(rpcMsgArr != NULL);
|
ASSERT(entryPArr != NULL);
|
||||||
ASSERT(arrSize > 0);
|
ASSERT(arrSize > 0);
|
||||||
|
|
||||||
int32_t dataLen = 0;
|
int32_t dataLen = 0;
|
||||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
|
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
|
||||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; // SRpcMsg
|
int32_t entryArrayLen = 0;
|
||||||
int32_t contArrayLen = 0;
|
|
||||||
for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont
|
for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont
|
||||||
contArrayLen += rpcMsgArr[i].contLen;
|
SSyncRaftEntry* pEntry = entryPArr[i];
|
||||||
|
entryArrayLen += pEntry->bytes;
|
||||||
}
|
}
|
||||||
dataLen += (metaArrayLen + rpcArrayLen + contArrayLen);
|
dataLen += (metaArrayLen + entryArrayLen);
|
||||||
|
|
||||||
uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen;
|
uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen;
|
||||||
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
|
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
|
||||||
|
@ -1498,30 +1626,30 @@ SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t
|
||||||
pMsg->dataLen = dataLen;
|
pMsg->dataLen = dataLen;
|
||||||
|
|
||||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
||||||
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + metaArrayLen);
|
|
||||||
char* pData = pMsg->data;
|
char* pData = pMsg->data;
|
||||||
|
|
||||||
for (int i = 0; i < arrSize; ++i) {
|
for (int i = 0; i < arrSize; ++i) {
|
||||||
// init <offset, contLen>
|
// init meta <offset, contLen>
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
metaArr[i].offset = metaArrayLen + rpcArrayLen;
|
metaArr[i].offset = metaArrayLen;
|
||||||
metaArr[i].contLen = rpcMsgArr[i].contLen;
|
metaArr[i].contLen = entryPArr[i]->bytes;
|
||||||
} else {
|
} else {
|
||||||
metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen;
|
metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen;
|
||||||
metaArr[i].contLen = rpcMsgArr[i].contLen;
|
metaArr[i].contLen = entryPArr[i]->bytes;
|
||||||
}
|
}
|
||||||
|
|
||||||
// init msgArr
|
// init entry array
|
||||||
msgArr[i] = rpcMsgArr[i];
|
ASSERT(metaArr[i].contLen == entryPArr[i]->bytes);
|
||||||
|
memcpy(pData + metaArr[i].offset, entryPArr[i], metaArr[i].contLen);
|
||||||
// init data
|
|
||||||
ASSERT(rpcMsgArr[i].contLen == metaArr[i].contLen);
|
|
||||||
memcpy(pData + metaArr[i].offset, rpcMsgArr[i].pCont, rpcMsgArr[i].contLen);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return pMsg;
|
return pMsg;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg) {
|
||||||
|
return (SOffsetAndContLen*)(pMsg->data);
|
||||||
|
}
|
||||||
|
|
||||||
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
|
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
|
||||||
if (pMsg != NULL) {
|
if (pMsg != NULL) {
|
||||||
taosMemoryFree(pMsg);
|
taosMemoryFree(pMsg);
|
||||||
|
@ -1634,16 +1762,12 @@ cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
|
||||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||||
|
|
||||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen>
|
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen>
|
||||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; // SRpcMsg
|
int32_t entryArrayLen = pMsg->dataLen - metaArrayLen;
|
||||||
int32_t contArrayLen = pMsg->dataLen - metaArrayLen - rpcArrayLen;
|
|
||||||
|
|
||||||
cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen);
|
cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen);
|
||||||
cJSON_AddNumberToObject(pRoot, "rpcArrayLen", rpcArrayLen);
|
cJSON_AddNumberToObject(pRoot, "entryArrayLen", entryArrayLen);
|
||||||
cJSON_AddNumberToObject(pRoot, "contArrayLen", contArrayLen);
|
|
||||||
|
|
||||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
||||||
SRpcMsg* msgArr = (SRpcMsg*)(pMsg->data + metaArrayLen);
|
|
||||||
void* pData = (void*)(pMsg->data + metaArrayLen + rpcArrayLen);
|
|
||||||
|
|
||||||
cJSON* pMetaArr = cJSON_CreateArray();
|
cJSON* pMetaArr = cJSON_CreateArray();
|
||||||
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
|
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
|
||||||
|
@ -1654,14 +1778,12 @@ cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
|
||||||
cJSON_AddItemToArray(pMetaArr, pMeta);
|
cJSON_AddItemToArray(pMetaArr, pMeta);
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* pMsgArr = cJSON_CreateArray();
|
cJSON* pEntryArr = cJSON_CreateArray();
|
||||||
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr);
|
cJSON_AddItemToObject(pRoot, "entryArr", pEntryArr);
|
||||||
for (int i = 0; i < pMsg->dataCount; ++i) {
|
for (int i = 0; i < pMsg->dataCount; ++i) {
|
||||||
cJSON* pRpcMsgJson = cJSON_CreateObject();
|
SSyncRaftEntry* pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
|
||||||
cJSON_AddNumberToObject(pRpcMsgJson, "code", msgArr[i].code);
|
cJSON* pEntryJson = syncEntry2Json(pEntry);
|
||||||
cJSON_AddNumberToObject(pRpcMsgJson, "contLen", msgArr[i].contLen);
|
cJSON_AddItemToArray(pEntryArr, pEntryJson);
|
||||||
cJSON_AddNumberToObject(pRpcMsgJson, "msgType", msgArr[i].msgType);
|
|
||||||
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
char* s;
|
char* s;
|
||||||
|
@ -1685,33 +1807,6 @@ char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) {
|
||||||
return serialized;
|
return serialized;
|
||||||
}
|
}
|
||||||
|
|
||||||
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
|
|
||||||
int32_t* pRetArrSize) {
|
|
||||||
if (pRetArrSize != NULL) {
|
|
||||||
*pRetArrSize = pSyncMsg->dataCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t arrSize = pSyncMsg->dataCount;
|
|
||||||
if (arrSize > maxArrSize) {
|
|
||||||
arrSize = maxArrSize;
|
|
||||||
}
|
|
||||||
|
|
||||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pSyncMsg->dataCount; // <offset, contLen>
|
|
||||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * pSyncMsg->dataCount; // SRpcMsg
|
|
||||||
int32_t contArrayLen = pSyncMsg->dataLen - metaArrayLen - rpcArrayLen;
|
|
||||||
|
|
||||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pSyncMsg->data);
|
|
||||||
SRpcMsg* msgArr = (SRpcMsg*)(pSyncMsg->data + metaArrayLen);
|
|
||||||
void* pData = pSyncMsg->data + metaArrayLen + rpcArrayLen;
|
|
||||||
|
|
||||||
for (int i = 0; i < arrSize; ++i) {
|
|
||||||
rpcMsgArr[i] = msgArr[i];
|
|
||||||
rpcMsgArr[i].pCont = rpcMallocCont(msgArr[i].contLen);
|
|
||||||
void* pRpcCont = pSyncMsg->data + metaArr[i].offset;
|
|
||||||
memcpy(rpcMsgArr[i].pCont, pRpcCont, rpcMsgArr[i].contLen);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// for debug ----------------------
|
// for debug ----------------------
|
||||||
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) {
|
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) {
|
||||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||||
|
@ -2159,6 +2254,9 @@ cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg) {
|
||||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm);
|
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm);
|
||||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||||
|
|
||||||
|
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->beginIndex);
|
||||||
|
cJSON_AddStringToObject(pRoot, "beginIndex", u64buf);
|
||||||
|
|
||||||
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
|
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->lastIndex);
|
||||||
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
|
cJSON_AddStringToObject(pRoot, "lastIndex", u64buf);
|
||||||
|
|
||||||
|
|
|
@ -50,6 +50,22 @@ SSyncRaftEntry* syncEntryBuild3(SyncClientRequest* pMsg, SyncTerm term, SyncInde
|
||||||
return pEntry;
|
return pEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
SSyncRaftEntry* syncEntryBuild4(SRpcMsg* pOriginalMsg, SyncTerm term, SyncIndex index) {
|
||||||
|
SSyncRaftEntry* pEntry = syncEntryBuild(pOriginalMsg->contLen);
|
||||||
|
ASSERT(pEntry != NULL);
|
||||||
|
|
||||||
|
pEntry->msgType = TDMT_SYNC_CLIENT_REQUEST;
|
||||||
|
pEntry->originalRpcType = pOriginalMsg->msgType;
|
||||||
|
pEntry->seqNum = 0;
|
||||||
|
pEntry->isWeak = 0;
|
||||||
|
pEntry->term = term;
|
||||||
|
pEntry->index = index;
|
||||||
|
pEntry->dataLen = pOriginalMsg->contLen;
|
||||||
|
memcpy(pEntry->data, pOriginalMsg->pCont, pOriginalMsg->contLen);
|
||||||
|
|
||||||
|
return pEntry;
|
||||||
|
}
|
||||||
|
|
||||||
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
|
SSyncRaftEntry* syncEntryBuildNoop(SyncTerm term, SyncIndex index, int32_t vgId) {
|
||||||
// init rpcMsg
|
// init rpcMsg
|
||||||
SMsgHead head;
|
SMsgHead head;
|
||||||
|
|
|
@ -32,6 +32,7 @@ static SyncTerm raftLogLastTerm(struct SSyncLogStore* pLogStore);
|
||||||
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry);
|
||||||
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
|
static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncRaftEntry** ppEntry);
|
||||||
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIndex);
|
||||||
|
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index);
|
||||||
|
|
||||||
// private function
|
// private function
|
||||||
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
|
static int32_t raftLogGetLastEntry(SSyncLogStore* pLogStore, SSyncRaftEntry** ppLastEntry);
|
||||||
|
@ -83,6 +84,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
|
||||||
pLogStore->syncLogGetEntry = raftLogGetEntry;
|
pLogStore->syncLogGetEntry = raftLogGetEntry;
|
||||||
pLogStore->syncLogTruncate = raftLogTruncate;
|
pLogStore->syncLogTruncate = raftLogTruncate;
|
||||||
pLogStore->syncLogWriteIndex = raftLogWriteIndex;
|
pLogStore->syncLogWriteIndex = raftLogWriteIndex;
|
||||||
|
pLogStore->syncLogExist = raftLogExist;
|
||||||
|
|
||||||
return pLogStore;
|
return pLogStore;
|
||||||
}
|
}
|
||||||
|
@ -168,6 +170,13 @@ static SyncIndex raftLogWriteIndex(struct SSyncLogStore* pLogStore) {
|
||||||
return lastVer + 1;
|
return lastVer + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static bool raftLogExist(struct SSyncLogStore* pLogStore, SyncIndex index) {
|
||||||
|
SSyncLogStoreData* pData = pLogStore->data;
|
||||||
|
SWal* pWal = pData->pWal;
|
||||||
|
bool b = walLogExist(pWal, index);
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
// if success, return last term
|
// if success, return last term
|
||||||
// if not log, return 0
|
// if not log, return 0
|
||||||
// if error, return SYNC_TERM_INVALID
|
// if error, return SYNC_TERM_INVALID
|
||||||
|
|
|
@ -145,26 +145,34 @@ int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
|
SSyncRaftEntry* entryPArr[SYNC_MAX_BATCH_SIZE];
|
||||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
memset(entryPArr, 0, sizeof(entryPArr));
|
||||||
|
|
||||||
int32_t getCount = 0;
|
int32_t getCount = 0;
|
||||||
|
SyncIndex getEntryIndex = nextIndex;
|
||||||
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
|
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
|
||||||
SSyncRaftEntry* pEntry;
|
SSyncRaftEntry* pEntry;
|
||||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
|
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, getEntryIndex, &pEntry);
|
||||||
if (code == 0) {
|
if (code == 0) {
|
||||||
ASSERT(pEntry != NULL);
|
ASSERT(pEntry != NULL);
|
||||||
// get rpc msg [i] from entry
|
entryPArr[i] = pEntry;
|
||||||
syncEntryDestory(pEntry);
|
|
||||||
getCount++;
|
getCount++;
|
||||||
} else {
|
} else {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId);
|
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(entryPArr, getCount, pSyncNode->vgId);
|
||||||
ASSERT(pMsg != NULL);
|
ASSERT(pMsg != NULL);
|
||||||
|
|
||||||
|
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
|
||||||
|
SSyncRaftEntry* pEntry = entryPArr[i];
|
||||||
|
if (pEntry != NULL) {
|
||||||
|
syncEntryDestory(pEntry);
|
||||||
|
entryPArr[i] = NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// prepare msg
|
// prepare msg
|
||||||
pMsg->srcId = pSyncNode->myRaftId;
|
pMsg->srcId = pSyncNode->myRaftId;
|
||||||
pMsg->destId = *pDestId;
|
pMsg->destId = *pDestId;
|
||||||
|
|
|
@ -80,13 +80,15 @@ void snapshotSenderDestroy(SSyncSnapshotSender *pSender) {
|
||||||
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
|
bool snapshotSenderIsStart(SSyncSnapshotSender *pSender) { return pSender->start; }
|
||||||
|
|
||||||
// begin send snapshot by snapshot, pReader
|
// begin send snapshot by snapshot, pReader
|
||||||
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, void *pReader) {
|
int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshotParam snapshotParam, SSnapshot snapshot,
|
||||||
|
void *pReader) {
|
||||||
ASSERT(!snapshotSenderIsStart(pSender));
|
ASSERT(!snapshotSenderIsStart(pSender));
|
||||||
|
|
||||||
// init snapshot and reader
|
// init snapshot, parm, reader
|
||||||
ASSERT(pSender->pReader == NULL);
|
ASSERT(pSender->pReader == NULL);
|
||||||
pSender->pReader = pReader;
|
pSender->pReader = pReader;
|
||||||
pSender->snapshot = snapshot;
|
pSender->snapshot = snapshot;
|
||||||
|
pSender->snapshotParam = snapshotParam;
|
||||||
|
|
||||||
// init current block
|
// init current block
|
||||||
if (pSender->pCurrentBlock != NULL) {
|
if (pSender->pCurrentBlock != NULL) {
|
||||||
|
@ -162,6 +164,7 @@ int32_t snapshotSenderStart(SSyncSnapshotSender *pSender, SSnapshot snapshot, vo
|
||||||
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
pMsg->srcId = pSender->pSyncNode->myRaftId;
|
||||||
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
pMsg->destId = (pSender->pSyncNode->replicasId)[pSender->replicaIndex];
|
||||||
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
pMsg->term = pSender->pSyncNode->pRaftStore->currentTerm;
|
||||||
|
pMsg->beginIndex = pSender->snapshotParam.start;
|
||||||
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
pMsg->lastIndex = pSender->snapshot.lastApplyIndex;
|
||||||
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
pMsg->lastTerm = pSender->snapshot.lastApplyTerm;
|
||||||
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
pMsg->lastConfigIndex = pSender->snapshot.lastConfigIndex;
|
||||||
|
@ -439,10 +442,13 @@ static void snapshotReceiverDoStart(SSyncSnapshotReceiver *pReceiver, SyncTerm p
|
||||||
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
|
pReceiver->snapshot.lastApplyIndex = pBeginMsg->lastIndex;
|
||||||
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
|
pReceiver->snapshot.lastApplyTerm = pBeginMsg->lastTerm;
|
||||||
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
|
pReceiver->snapshot.lastConfigIndex = pBeginMsg->lastConfigIndex;
|
||||||
|
pReceiver->snapshotParam.start = pBeginMsg->beginIndex;
|
||||||
|
pReceiver->snapshotParam.end = pBeginMsg->lastIndex;
|
||||||
|
|
||||||
// write data
|
// write data
|
||||||
ASSERT(pReceiver->pWriter == NULL);
|
ASSERT(pReceiver->pWriter == NULL);
|
||||||
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm, &(pReceiver->pWriter));
|
int32_t ret = pReceiver->pSyncNode->pFsm->FpSnapshotStartWrite(pReceiver->pSyncNode->pFsm,
|
||||||
|
&(pReceiver->snapshotParam), &(pReceiver->pWriter));
|
||||||
ASSERT(ret == 0);
|
ASSERT(ret == 0);
|
||||||
|
|
||||||
// event log
|
// event log
|
||||||
|
|
|
@ -24,6 +24,7 @@ add_executable(syncAppendEntriesTest "")
|
||||||
add_executable(syncAppendEntriesBatchTest "")
|
add_executable(syncAppendEntriesBatchTest "")
|
||||||
add_executable(syncAppendEntriesReplyTest "")
|
add_executable(syncAppendEntriesReplyTest "")
|
||||||
add_executable(syncClientRequestTest "")
|
add_executable(syncClientRequestTest "")
|
||||||
|
add_executable(syncClientRequestBatchTest "")
|
||||||
add_executable(syncTimeoutTest "")
|
add_executable(syncTimeoutTest "")
|
||||||
add_executable(syncPingTest "")
|
add_executable(syncPingTest "")
|
||||||
add_executable(syncPingReplyTest "")
|
add_executable(syncPingReplyTest "")
|
||||||
|
@ -159,6 +160,10 @@ target_sources(syncClientRequestTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncClientRequestTest.cpp"
|
"syncClientRequestTest.cpp"
|
||||||
)
|
)
|
||||||
|
target_sources(syncClientRequestBatchTest
|
||||||
|
PRIVATE
|
||||||
|
"syncClientRequestBatchTest.cpp"
|
||||||
|
)
|
||||||
target_sources(syncTimeoutTest
|
target_sources(syncTimeoutTest
|
||||||
PRIVATE
|
PRIVATE
|
||||||
"syncTimeoutTest.cpp"
|
"syncTimeoutTest.cpp"
|
||||||
|
@ -407,6 +412,11 @@ target_include_directories(syncClientRequestTest
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
)
|
||||||
|
target_include_directories(syncClientRequestBatchTest
|
||||||
|
PUBLIC
|
||||||
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
|
)
|
||||||
target_include_directories(syncTimeoutTest
|
target_include_directories(syncTimeoutTest
|
||||||
PUBLIC
|
PUBLIC
|
||||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||||
|
@ -658,6 +668,10 @@ target_link_libraries(syncClientRequestTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
)
|
)
|
||||||
|
target_link_libraries(syncClientRequestBatchTest
|
||||||
|
sync
|
||||||
|
gtest_main
|
||||||
|
)
|
||||||
target_link_libraries(syncTimeoutTest
|
target_link_libraries(syncTimeoutTest
|
||||||
sync
|
sync
|
||||||
gtest_main
|
gtest_main
|
||||||
|
|
|
@ -3,6 +3,7 @@
|
||||||
#include "syncIO.h"
|
#include "syncIO.h"
|
||||||
#include "syncInt.h"
|
#include "syncInt.h"
|
||||||
#include "syncMessage.h"
|
#include "syncMessage.h"
|
||||||
|
#include "syncRaftEntry.h"
|
||||||
#include "syncUtil.h"
|
#include "syncUtil.h"
|
||||||
#include "trpc.h"
|
#include "trpc.h"
|
||||||
|
|
||||||
|
@ -15,30 +16,29 @@ void logTest() {
|
||||||
sFatal("--- sync log test: fatal");
|
sFatal("--- sync log test: fatal");
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
|
SSyncRaftEntry *createEntry(int i) {
|
||||||
SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
|
SSyncRaftEntry *pEntry = syncEntryBuild(20);
|
||||||
memset(pRpcMsg, 0, sizeof(SRpcMsg));
|
assert(pEntry != NULL);
|
||||||
|
pEntry->msgType = 1;
|
||||||
pRpcMsg->msgType = TDMT_SYNC_PING;
|
pEntry->originalRpcType = 2;
|
||||||
pRpcMsg->contLen = dataLen;
|
pEntry->seqNum = 3;
|
||||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
pEntry->isWeak = true;
|
||||||
pRpcMsg->code = 10 * i;
|
pEntry->term = 100;
|
||||||
snprintf((char *)pRpcMsg->pCont, pRpcMsg->contLen, "value_%d", i);
|
pEntry->index = 200;
|
||||||
|
snprintf(pEntry->data, pEntry->dataLen, "value_%d", i);
|
||||||
return pRpcMsg;
|
return pEntry;
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntriesBatch *createMsg() {
|
SyncAppendEntriesBatch *createMsg() {
|
||||||
SRpcMsg rpcMsgArr[5];
|
SSyncRaftEntry *entryPArr[5];
|
||||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
memset(entryPArr, 0, sizeof(entryPArr));
|
||||||
|
|
||||||
for (int32_t i = 0; i < 5; ++i) {
|
for (int32_t i = 0; i < 5; ++i) {
|
||||||
SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
|
SSyncRaftEntry *pEntry = createEntry(i);
|
||||||
rpcMsgArr[i] = *pRpcMsg;
|
entryPArr[i] = pEntry;
|
||||||
taosMemoryFree(pRpcMsg);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, 5, 1234);
|
SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(entryPArr, 5, 1234);
|
||||||
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
|
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
|
||||||
pMsg->srcId.vgId = 100;
|
pMsg->srcId.vgId = 100;
|
||||||
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
||||||
|
@ -52,17 +52,17 @@ SyncAppendEntriesBatch *createMsg() {
|
||||||
|
|
||||||
void test1() {
|
void test1() {
|
||||||
SyncAppendEntriesBatch *pMsg = createMsg();
|
SyncAppendEntriesBatch *pMsg = createMsg();
|
||||||
syncAppendEntriesBatchLog2((char *)"test1:", pMsg);
|
syncAppendEntriesBatchLog2((char *)"==test1==", pMsg);
|
||||||
|
|
||||||
SRpcMsg rpcMsgArr[5];
|
/*
|
||||||
int32_t retArrSize;
|
SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg);
|
||||||
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, 5, &retArrSize);
|
int32_t retArrSize = pMsg->dataCount;
|
||||||
for (int i = 0; i < retArrSize; ++i) {
|
for (int i = 0; i < retArrSize; ++i) {
|
||||||
char logBuf[128];
|
SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
|
||||||
snprintf(logBuf, sizeof(logBuf), "==test1 decode rpc msg %d: msgType:%d, code:%d, contLen:%d, pCont:%s \n", i,
|
ASSERT(pEntry->bytes == metaArr[i].contLen);
|
||||||
rpcMsgArr[i].msgType, rpcMsgArr[i].code, rpcMsgArr[i].contLen, (char *)rpcMsgArr[i].pCont);
|
syncEntryPrint(pEntry);
|
||||||
sTrace("%s", logBuf);
|
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
syncAppendEntriesBatchDestroy(pMsg);
|
syncAppendEntriesBatchDestroy(pMsg);
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,125 @@
|
||||||
|
#include <gtest/gtest.h>
|
||||||
|
#include <stdio.h>
|
||||||
|
#include "syncIO.h"
|
||||||
|
#include "syncInt.h"
|
||||||
|
#include "syncMessage.h"
|
||||||
|
#include "syncUtil.h"
|
||||||
|
|
||||||
|
void logTest() {
|
||||||
|
sTrace("--- sync log test: trace");
|
||||||
|
sDebug("--- sync log test: debug");
|
||||||
|
sInfo("--- sync log test: info");
|
||||||
|
sWarn("--- sync log test: warn");
|
||||||
|
sError("--- sync log test: error");
|
||||||
|
sFatal("--- sync log test: fatal");
|
||||||
|
}
|
||||||
|
|
||||||
|
SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
|
||||||
|
SyncPing *pSyncMsg = syncPingBuild(20);
|
||||||
|
snprintf(pSyncMsg->data, pSyncMsg->dataLen, "value_%d", i);
|
||||||
|
|
||||||
|
SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
|
||||||
|
memset(pRpcMsg, 0, sizeof(SRpcMsg));
|
||||||
|
pRpcMsg->code = 10 * i;
|
||||||
|
syncPing2RpcMsg(pSyncMsg, pRpcMsg);
|
||||||
|
|
||||||
|
syncPingDestroy(pSyncMsg);
|
||||||
|
return pRpcMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncClientRequestBatch *createMsg() {
|
||||||
|
SRpcMsg rpcMsgArr[5];
|
||||||
|
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
||||||
|
for (int32_t i = 0; i < 5; ++i) {
|
||||||
|
SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
|
||||||
|
rpcMsgArr[i] = *pRpcMsg;
|
||||||
|
taosMemoryFree(pRpcMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
SRaftMeta raftArr[5];
|
||||||
|
memset(raftArr, 0, sizeof(raftArr));
|
||||||
|
for (int32_t i = 0; i < 5; ++i) {
|
||||||
|
raftArr[i].seqNum = i * 10;
|
||||||
|
raftArr[i].isWeak = i % 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
SyncClientRequestBatch *pMsg = syncClientRequestBatchBuild(rpcMsgArr, raftArr, 5, 1234);
|
||||||
|
return pMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
void test1() {
|
||||||
|
SyncClientRequestBatch *pMsg = createMsg();
|
||||||
|
syncClientRequestBatchLog2((char *)"==test1==", pMsg);
|
||||||
|
syncClientRequestBatchDestroyDeep(pMsg);
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
void test2() {
|
||||||
|
SyncClientRequest *pMsg = createMsg();
|
||||||
|
uint32_t len = pMsg->bytes;
|
||||||
|
char * serialized = (char *)taosMemoryMalloc(len);
|
||||||
|
syncClientRequestSerialize(pMsg, serialized, len);
|
||||||
|
SyncClientRequest *pMsg2 = syncClientRequestBuild(pMsg->dataLen);
|
||||||
|
syncClientRequestDeserialize(serialized, len, pMsg2);
|
||||||
|
syncClientRequestLog2((char *)"test2: syncClientRequestSerialize -> syncClientRequestDeserialize ", pMsg2);
|
||||||
|
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
syncClientRequestDestroy(pMsg);
|
||||||
|
syncClientRequestDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test3() {
|
||||||
|
SyncClientRequest *pMsg = createMsg();
|
||||||
|
uint32_t len;
|
||||||
|
char * serialized = syncClientRequestSerialize2(pMsg, &len);
|
||||||
|
SyncClientRequest *pMsg2 = syncClientRequestDeserialize2(serialized, len);
|
||||||
|
syncClientRequestLog2((char *)"test3: syncClientRequestSerialize3 -> syncClientRequestDeserialize2 ", pMsg2);
|
||||||
|
|
||||||
|
taosMemoryFree(serialized);
|
||||||
|
syncClientRequestDestroy(pMsg);
|
||||||
|
syncClientRequestDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test4() {
|
||||||
|
SyncClientRequest *pMsg = createMsg();
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
SyncClientRequest *pMsg2 = (SyncClientRequest *)taosMemoryMalloc(rpcMsg.contLen);
|
||||||
|
syncClientRequestFromRpcMsg(&rpcMsg, pMsg2);
|
||||||
|
syncClientRequestLog2((char *)"test4: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg ", pMsg2);
|
||||||
|
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
syncClientRequestDestroy(pMsg);
|
||||||
|
syncClientRequestDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
|
||||||
|
void test5() {
|
||||||
|
SyncClientRequest *pMsg = createMsg();
|
||||||
|
SRpcMsg rpcMsg;
|
||||||
|
syncClientRequest2RpcMsg(pMsg, &rpcMsg);
|
||||||
|
SyncClientRequest *pMsg2 = syncClientRequestFromRpcMsg2(&rpcMsg);
|
||||||
|
syncClientRequestLog2((char *)"test5: syncClientRequest2RpcMsg -> syncClientRequestFromRpcMsg2 ", pMsg2);
|
||||||
|
|
||||||
|
rpcFreeCont(rpcMsg.pCont);
|
||||||
|
syncClientRequestDestroy(pMsg);
|
||||||
|
syncClientRequestDestroy(pMsg2);
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
|
int main() {
|
||||||
|
gRaftDetailLog = true;
|
||||||
|
tsAsyncLog = 0;
|
||||||
|
sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||||
|
logTest();
|
||||||
|
|
||||||
|
test1();
|
||||||
|
|
||||||
|
/*
|
||||||
|
test2();
|
||||||
|
test3();
|
||||||
|
test4();
|
||||||
|
test5();
|
||||||
|
*/
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
|
@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
|
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) {
|
||||||
*ppReader = (void*)0xABCD;
|
*ppReader = (void*)0xABCD;
|
||||||
char logBuf[256] = {0};
|
char logBuf[256] = {0};
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
|
||||||
|
@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) {
|
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) {
|
||||||
*ppWriter = (void*)0xCDEF;
|
*ppWriter = (void*)0xCDEF;
|
||||||
char logBuf[256] = {0};
|
char logBuf[256] = {0};
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);
|
||||||
|
@ -198,7 +198,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
|
||||||
syncInfo.pWal = pWal;
|
syncInfo.pWal = pWal;
|
||||||
syncInfo.isStandBy = isStandBy;
|
syncInfo.isStandBy = isStandBy;
|
||||||
syncInfo.snapshotEnable = true;
|
syncInfo.snapshotStrategy = SYNC_STRATEGY_STANDARD_SNAPSHOT;
|
||||||
|
|
||||||
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
|
||||||
|
|
|
@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; }
|
||||||
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
|
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
|
||||||
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
|
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
|
||||||
|
|
||||||
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) { return 0; }
|
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { return 0; }
|
||||||
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; }
|
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; }
|
||||||
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }
|
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }
|
||||||
|
|
||||||
|
|
|
@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
|
||||||
|
|
||||||
int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; }
|
int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; }
|
||||||
|
|
||||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { return 0; }
|
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) { return 0; }
|
||||||
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
|
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
|
||||||
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
|
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
|
||||||
|
|
||||||
|
|
|
@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
|
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void* pParam, void** ppReader) {
|
||||||
*ppReader = (void*)0xABCD;
|
*ppReader = (void*)0xABCD;
|
||||||
char logBuf[256] = {0};
|
char logBuf[256] = {0};
|
||||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
|
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
|
||||||
|
@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void** ppWriter) {
|
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) {
|
||||||
*ppWriter = (void*)0xCDEF;
|
*ppWriter = (void*)0xCDEF;
|
||||||
char logBuf[256] = {0};
|
char logBuf[256] = {0};
|
||||||
|
|
||||||
|
@ -203,7 +203,7 @@ SWal* createWal(char* path, int32_t vgId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy,
|
int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal* pWal, char* path, bool isStandBy,
|
||||||
bool enableSnapshot) {
|
ESyncStrategy enableSnapshot) {
|
||||||
SSyncInfo syncInfo;
|
SSyncInfo syncInfo;
|
||||||
syncInfo.vgId = vgId;
|
syncInfo.vgId = vgId;
|
||||||
syncInfo.msgcb = &gSyncIO->msgcb;
|
syncInfo.msgcb = &gSyncIO->msgcb;
|
||||||
|
@ -213,7 +213,7 @@ int64_t createSyncNode(int32_t replicaNum, int32_t myIndex, int32_t vgId, SWal*
|
||||||
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
|
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s_sync_replica%d_index%d", path, replicaNum, myIndex);
|
||||||
syncInfo.pWal = pWal;
|
syncInfo.pWal = pWal;
|
||||||
syncInfo.isStandBy = isStandBy;
|
syncInfo.isStandBy = isStandBy;
|
||||||
syncInfo.snapshotEnable = enableSnapshot;
|
syncInfo.snapshotStrategy = enableSnapshot;
|
||||||
|
|
||||||
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
SSyncCfg* pCfg = &syncInfo.syncCfg;
|
||||||
|
|
||||||
|
@ -316,7 +316,7 @@ int main(int argc, char** argv) {
|
||||||
|
|
||||||
int32_t replicaNum = atoi(argv[1]);
|
int32_t replicaNum = atoi(argv[1]);
|
||||||
int32_t myIndex = atoi(argv[2]);
|
int32_t myIndex = atoi(argv[2]);
|
||||||
bool enableSnapshot = atoi(argv[3]);
|
ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]);
|
||||||
int32_t lastApplyIndex = atoi(argv[4]);
|
int32_t lastApplyIndex = atoi(argv[4]);
|
||||||
int32_t lastApplyTerm = atoi(argv[5]);
|
int32_t lastApplyTerm = atoi(argv[5]);
|
||||||
int32_t writeRecordNum = atoi(argv[6]);
|
int32_t writeRecordNum = atoi(argv[6]);
|
||||||
|
|
|
@ -433,6 +433,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for re
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
|
||||||
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_TIMEOUT, "Sync timeout")
|
||||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||||
|
|
||||||
// wal
|
// wal
|
||||||
|
|
Loading…
Reference in New Issue