Merge pull request #14430 from taosdata/feature/3.0_mhli
refactor(sync): add snapshot2 interface
This commit is contained in:
commit
e7729ee3a3
|
@ -233,10 +233,12 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES, "sync-append-entries", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_BATCH, "sync-append-entries-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_APPEND_ENTRIES_REPLY, "sync-append-entries-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_NOOP, "sync-noop", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_UNKNOWN, "sync-unknown", NULL, NULL)
|
||||
|
|
|
@ -26,6 +26,7 @@ extern "C" {
|
|||
|
||||
extern bool gRaftDetailLog;
|
||||
|
||||
#define SYNC_MAX_BATCH_SIZE 100
|
||||
#define SYNC_INDEX_BEGIN 0
|
||||
#define SYNC_INDEX_INVALID -1
|
||||
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
|
||||
|
@ -120,7 +121,7 @@ typedef struct SSyncFSM {
|
|||
int32_t (*FpGetSnapshot)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot, void* pReaderParam, void** ppReader);
|
||||
int32_t (*FpGetSnapshotInfo)(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||
|
||||
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void** ppReader);
|
||||
int32_t (*FpSnapshotStartRead)(struct SSyncFSM* pFsm, void* pReaderParam, void** ppReader);
|
||||
int32_t (*FpSnapshotStopRead)(struct SSyncFSM* pFsm, void* pReader);
|
||||
int32_t (*FpSnapshotDoRead)(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len);
|
||||
|
||||
|
@ -164,6 +165,7 @@ typedef struct SSyncLogStore {
|
|||
bool (*syncLogIsEmpty)(struct SSyncLogStore* pLogStore);
|
||||
int32_t (*syncLogEntryCount)(struct SSyncLogStore* pLogStore);
|
||||
int32_t (*syncLogRestoreFromSnapshot)(struct SSyncLogStore* pLogStore, SyncIndex index);
|
||||
bool (*syncLogExist)(struct SSyncLogStore* pLogStore, SyncIndex index);
|
||||
|
||||
SyncIndex (*syncLogWriteIndex)(struct SSyncLogStore* pLogStore);
|
||||
SyncIndex (*syncLogLastIndex)(struct SSyncLogStore* pLogStore);
|
||||
|
@ -179,6 +181,7 @@ typedef struct SSyncInfo {
|
|||
bool isStandBy;
|
||||
bool snapshotEnable;
|
||||
SyncGroupId vgId;
|
||||
int32_t batchSize;
|
||||
SSyncCfg syncCfg;
|
||||
char path[TSDB_FILENAME_LEN];
|
||||
SWal* pWal;
|
||||
|
@ -202,6 +205,7 @@ SyncGroupId syncGetVgId(int64_t rid);
|
|||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||
// int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
|
||||
bool syncEnvIsStart();
|
||||
const char* syncStr(ESyncState state);
|
||||
bool syncIsRestoreFinish(int64_t rid);
|
||||
|
|
|
@ -219,6 +219,34 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
|
|||
void syncClientRequestLog(const SyncClientRequest* pMsg);
|
||||
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SOffsetAndContLen {
|
||||
int32_t offset;
|
||||
int32_t contLen;
|
||||
} SOffsetAndContLen;
|
||||
|
||||
typedef struct SRaftMeta {
|
||||
uint64_t seqNum;
|
||||
bool isWeak;
|
||||
} SRaftMeta;
|
||||
|
||||
// block1:
|
||||
// block2: SRaftMeta array
|
||||
// block3: rpc msg array (with pCont)
|
||||
|
||||
typedef struct SyncClientRequestBatch {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType; // SyncClientRequestBatch msgType
|
||||
uint32_t dataCount;
|
||||
uint32_t dataLen; // user RpcMsg.contLen
|
||||
char data[]; // user RpcMsg.pCont
|
||||
} SyncClientRequestBatch;
|
||||
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
int32_t vgId);
|
||||
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncClientRequestReply {
|
||||
uint32_t bytes;
|
||||
|
@ -325,22 +353,53 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
|
|||
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
|
||||
// define ahead
|
||||
/*
|
||||
typedef struct SOffsetAndContLen {
|
||||
int32_t offset;
|
||||
int32_t contLen;
|
||||
} SOffsetAndContLen;
|
||||
*/
|
||||
|
||||
typedef struct SyncAppendEntriesBatch {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType;
|
||||
SRaftId srcId;
|
||||
SRaftId destId;
|
||||
|
||||
// private data
|
||||
SyncTerm term;
|
||||
SyncIndex prevLogIndex;
|
||||
SyncTerm prevLogTerm;
|
||||
SyncIndex commitIndex;
|
||||
SyncTerm privateTerm;
|
||||
int32_t dataCount;
|
||||
uint32_t dataLen;
|
||||
char data[];
|
||||
} SyncAppendEntriesBatch;
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId);
|
||||
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
|
||||
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
|
||||
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len);
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len);
|
||||
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg);
|
||||
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg);
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
|
||||
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg);
|
||||
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
|
||||
int32_t* pRetArrSize);
|
||||
|
||||
// for debug ----------------------
|
||||
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg);
|
||||
void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncAppendEntriesReply {
|
||||
uint32_t bytes;
|
||||
|
@ -542,6 +601,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
|||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
|
||||
int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg);
|
||||
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
|
||||
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||
|
|
|
@ -423,6 +423,7 @@ int32_t* taosGetErrno();
|
|||
#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910)
|
||||
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
|
||||
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
|
||||
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
|
||||
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
|
||||
|
||||
// tq
|
||||
|
|
|
@ -117,7 +117,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
|
|||
}
|
||||
}
|
||||
|
||||
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) {
|
||||
int32_t mndSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
|
||||
mDebug("start to read snapshot from sdb");
|
||||
SMnode *pMnode = pFsm->data;
|
||||
return sdbStartRead(pMnode->pSdb, (SSdbIter **)ppReader, NULL, NULL, NULL);
|
||||
|
|
|
@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
|
|||
}
|
||||
|
||||
void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode * pVnode = pInfo->ahandle;
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
|||
}
|
||||
|
||||
void vnodeApplyMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
|
||||
SVnode * pVnode = pInfo->ahandle;
|
||||
SVnode *pVnode = pInfo->ahandle;
|
||||
int32_t vgId = pVnode->config.vgId;
|
||||
int32_t code = 0;
|
||||
SRpcMsg *pMsg = NULL;
|
||||
|
@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
|||
STraceId *trace = &pMsg->info.traceId;
|
||||
|
||||
do {
|
||||
char * syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||
char *syncNodeStr = sync2SimpleStr(pVnode->sync);
|
||||
static int64_t vndTick = 0;
|
||||
if (++vndTick % 10 == 1) {
|
||||
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
|
||||
|
@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
|
|||
}
|
||||
|
||||
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
|
||||
SVnode * pVnode = pFsm->data;
|
||||
SVnode *pVnode = pFsm->data;
|
||||
SSnapshot snapshot = {0};
|
||||
SyncIndex beginIndex = SYNC_INDEX_INVALID;
|
||||
char logBuf[256] = {0};
|
||||
|
@ -409,7 +409,7 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
|
|||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||
}
|
||||
|
||||
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void **ppReader) { return 0; }
|
||||
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; }
|
||||
|
||||
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; }
|
||||
|
||||
|
|
|
@ -94,6 +94,7 @@ extern "C" {
|
|||
//
|
||||
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -42,6 +42,12 @@ extern "C" {
|
|||
//
|
||||
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
|
||||
|
||||
typedef struct SReaderParam {
|
||||
SyncIndex start;
|
||||
SyncIndex end;
|
||||
} SReaderParam;
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -67,6 +67,7 @@ typedef struct SSyncNode {
|
|||
char path[TSDB_FILENAME_LEN];
|
||||
char raftStorePath[TSDB_FILENAME_LEN * 2];
|
||||
char configPath[TSDB_FILENAME_LEN * 2];
|
||||
int32_t batchSize;
|
||||
|
||||
// sync io
|
||||
SWal* pWal;
|
||||
|
@ -170,6 +171,7 @@ void syncNodeStart(SSyncNode* pSyncNode);
|
|||
void syncNodeStartStandBy(SSyncNode* pSyncNode);
|
||||
void syncNodeClose(SSyncNode* pSyncNode);
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
|
||||
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
|
||||
|
||||
// option
|
||||
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
|
||||
|
|
|
@ -53,8 +53,10 @@ extern "C" {
|
|||
//
|
||||
int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeReplicate(SSyncNode* pSyncNode);
|
||||
int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg);
|
||||
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntriesBatch* pMsg);
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender {
|
|||
bool start;
|
||||
int32_t seq;
|
||||
int32_t ack;
|
||||
void *pReader;
|
||||
void *pCurrentBlock;
|
||||
void * pReader;
|
||||
void * pCurrentBlock;
|
||||
int32_t blockLen;
|
||||
SSnapshot snapshot;
|
||||
SSyncCfg lastConfig;
|
||||
|
@ -62,14 +62,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
|
|||
int32_t snapshotReSend(SSyncSnapshotSender *pSender);
|
||||
|
||||
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
||||
char * snapshotSender2Str(SSyncSnapshotSender *pSender);
|
||||
char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
|
||||
|
||||
//---------------------------------------------------
|
||||
typedef struct SSyncSnapshotReceiver {
|
||||
bool start;
|
||||
int32_t ack;
|
||||
void *pWriter;
|
||||
void * pWriter;
|
||||
SyncTerm term;
|
||||
SyncTerm privateTerm;
|
||||
SSnapshot snapshot;
|
||||
|
@ -85,8 +85,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
|
|||
void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
|
||||
|
||||
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
||||
char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
|
||||
char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
|
||||
|
||||
//---------------------------------------------------
|
||||
// on message
|
||||
|
|
|
@ -719,6 +719,8 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
|
|||
return false;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg) { return 0; }
|
||||
|
||||
int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg) {
|
||||
int32_t ret = 0;
|
||||
int32_t code = 0;
|
||||
|
|
|
@ -108,48 +108,82 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
|||
return ret;
|
||||
}
|
||||
|
||||
#if 0
|
||||
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||
// only start once
|
||||
static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
|
||||
SyncAppendEntriesReply* pMsg) {
|
||||
// get sender
|
||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
|
||||
ASSERT(pSender != NULL);
|
||||
|
||||
SSnapshot snapshot = {
|
||||
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
|
||||
|
||||
void* pReader = NULL;
|
||||
SReaderParam readerParam = {.start = beginIndex, .end = endIndex};
|
||||
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
|
||||
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) {
|
||||
ASSERT(pReader != NULL);
|
||||
snapshotSenderStart(pSender, snapshot, pReader);
|
||||
|
||||
} else {
|
||||
if (pReader != NULL) {
|
||||
ths->pFsm->FpSnapshotStopRead(ths->pFsm, pReader);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||
int32_t ret = 0;
|
||||
|
||||
char logBuf[128] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm);
|
||||
syncAppendEntriesReplyLog2(logBuf, pMsg);
|
||||
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
return ret;
|
||||
// if already drop replica, do not process
|
||||
if (!syncNodeInRaftGroup(ths, &(pMsg->srcId)) && !ths->pRaftCfg->isStandBy) {
|
||||
syncNodeEventLog(ths, "recv sync-append-entries-reply, maybe replica already dropped");
|
||||
return -1;
|
||||
}
|
||||
|
||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
|
||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);
|
||||
|
||||
// no need this code, because if I receive reply.term, then I must have sent for that term.
|
||||
// if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||
// syncNodeUpdateTerm(ths, pMsg->term);
|
||||
// }
|
||||
// drop stale response
|
||||
if (pMsg->term < ths->pRaftStore->currentTerm) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, recv-term:%lu, drop stale response", pMsg->term);
|
||||
syncNodeEventLog(ths, logBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
// error term
|
||||
if (pMsg->term > ths->pRaftStore->currentTerm) {
|
||||
char logBuf[128] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term,
|
||||
ths->pRaftStore->currentTerm);
|
||||
syncNodeLog2(logBuf, ths);
|
||||
sError("%s", logBuf);
|
||||
return ret;
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "recv sync-append-entries-reply, error term, recv-term:%lu", pMsg->term);
|
||||
syncNodeErrorLog(ths, logBuf);
|
||||
return -1;
|
||||
}
|
||||
|
||||
ASSERT(pMsg->term == ths->pRaftStore->currentTerm);
|
||||
|
||||
if (pMsg->success) {
|
||||
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1);
|
||||
SyncIndex newNextIndex = pMsg->matchIndex + 1;
|
||||
SyncIndex newMatchIndex = pMsg->matchIndex;
|
||||
|
||||
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
|
||||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
|
||||
if (ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex) &&
|
||||
ths->pLogStore->syncLogExist(ths->pLogStore, newNextIndex - 1)) {
|
||||
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
|
||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), newNextIndex);
|
||||
|
||||
// maybe commit
|
||||
syncMaybeAdvanceCommitIndex(ths);
|
||||
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
|
||||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
||||
|
||||
// maybe commit
|
||||
if (ths->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncMaybeAdvanceCommitIndex(ths);
|
||||
}
|
||||
} else {
|
||||
// start snapshot <match+1, old snapshot.end>
|
||||
SSnapshot snapshot;
|
||||
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &snapshot);
|
||||
syncNodeStartSnapshot(ths, newMatchIndex + 1, snapshot.lastApplyIndex, snapshot.lastApplyTerm, pMsg);
|
||||
|
||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), snapshot.lastApplyIndex + 1);
|
||||
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
|
||||
}
|
||||
|
||||
} else {
|
||||
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
|
||||
|
@ -157,18 +191,35 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
|
|||
// notice! int64, uint64
|
||||
if (nextIndex > SYNC_INDEX_BEGIN) {
|
||||
--nextIndex;
|
||||
|
||||
if (ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex) &&
|
||||
ths->pLogStore->syncLogExist(ths->pLogStore, nextIndex - 1)) {
|
||||
// do nothing
|
||||
} else {
|
||||
SSyncRaftEntry* pEntry;
|
||||
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry);
|
||||
ASSERT(code == 0);
|
||||
syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg);
|
||||
|
||||
// get sender
|
||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
|
||||
ASSERT(pSender != NULL);
|
||||
SyncIndex sentryIndex = pSender->snapshot.lastApplyIndex + 1;
|
||||
|
||||
// update nextIndex to sentryIndex
|
||||
if (nextIndex <= sentryIndex) {
|
||||
nextIndex = sentryIndex;
|
||||
}
|
||||
}
|
||||
|
||||
} else {
|
||||
nextIndex = SYNC_INDEX_BEGIN;
|
||||
}
|
||||
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), nextIndex);
|
||||
}
|
||||
|
||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
|
||||
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
|
||||
|
||||
return ret;
|
||||
return 0;
|
||||
}
|
||||
#endif
|
||||
|
||||
int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
|
||||
int32_t ret = 0;
|
||||
|
|
|
@ -50,7 +50,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
|
|||
// process message ----
|
||||
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
|
||||
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
|
||||
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
|
||||
|
||||
// life cycle
|
||||
static void syncFreeNode(void* param);
|
||||
|
@ -627,6 +626,94 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
|
||||
if (arrSize < 0) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t ret = 0;
|
||||
SSyncNode* pSyncNode = taosAcquireRef(tsNodeRefId, rid);
|
||||
if (pSyncNode == NULL) {
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
ASSERT(rid == pSyncNode->rid);
|
||||
ret = syncNodeProposeBatch(pSyncNode, pMsgArr, pIsWeakArr, arrSize);
|
||||
|
||||
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
|
||||
return ret;
|
||||
}
|
||||
|
||||
static bool syncNodeBatchOK(SRpcMsg* pMsgArr, int32_t arrSize) {
|
||||
for (int32_t i = 0; i < arrSize; ++i) {
|
||||
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (pMsgArr[i].msgType == TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) {
|
||||
if (!syncNodeBatchOK(pMsgArr, arrSize)) {
|
||||
syncNodeErrorLog(pSyncNode, "sync propose batch error");
|
||||
terrno = TSDB_CODE_SYN_BATCH_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (arrSize > SYNC_MAX_BATCH_SIZE) {
|
||||
syncNodeErrorLog(pSyncNode, "sync propose match batch error");
|
||||
terrno = TSDB_CODE_SYN_BATCH_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||
syncNodeErrorLog(pSyncNode, "sync propose not leader");
|
||||
terrno = TSDB_CODE_SYN_NOT_LEADER;
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (pSyncNode->changing) {
|
||||
syncNodeErrorLog(pSyncNode, "sync propose not ready");
|
||||
terrno = TSDB_CODE_SYN_PROPOSE_NOT_READY;
|
||||
return -1;
|
||||
}
|
||||
|
||||
SRaftMeta raftArr[SYNC_MAX_BATCH_SIZE];
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
SRespStub stub;
|
||||
stub.createTime = taosGetTimestampMs();
|
||||
stub.rpcMsg = pMsgArr[i];
|
||||
uint64_t seqNum = syncRespMgrAdd(pSyncNode->pSyncRespMgr, &stub);
|
||||
|
||||
raftArr[i].isWeak = pIsWeakArr[i];
|
||||
raftArr[i].seqNum = seqNum;
|
||||
}
|
||||
|
||||
SyncClientRequestBatch* pSyncMsg = syncClientRequestBatchBuild(pMsgArr, raftArr, arrSize, pSyncNode->vgId);
|
||||
ASSERT(pSyncMsg != NULL);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncClientRequestBatch2RpcMsg(pSyncMsg, &rpcMsg);
|
||||
taosMemoryFree(pSyncMsg); // only free msg body, do not free rpc msg content
|
||||
|
||||
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
|
||||
// enqueue msg ok
|
||||
|
||||
} else {
|
||||
sError("enqueue msg error, FpEqMsg is NULL");
|
||||
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
|
||||
return -1;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||
int32_t ret = 0;
|
||||
|
||||
|
@ -2362,6 +2449,49 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeOnClientRequestBatchCb(SSyncNode* ths, SyncClientRequestBatch* pMsg) {
|
||||
int32_t code = 0;
|
||||
|
||||
if (ths->state != TAOS_SYNC_STATE_LEADER) {
|
||||
// call FpCommitCb, delete resp mgr
|
||||
return -1;
|
||||
}
|
||||
|
||||
SyncIndex index = ths->pLogStore->syncLogWriteIndex(ths->pLogStore);
|
||||
SyncTerm term = ths->pRaftStore->currentTerm;
|
||||
|
||||
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * pMsg->dataCount;
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount;
|
||||
SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
|
||||
for (int32_t i = 0; i < pMsg->dataCount; ++i) {
|
||||
SSyncRaftEntry* pEntry = syncEntryBuild(msgArr[i].contLen);
|
||||
ASSERT(pEntry != NULL);
|
||||
|
||||
pEntry->originalRpcType = msgArr[i].msgType;
|
||||
pEntry->seqNum = raftMetaArr[i].seqNum;
|
||||
pEntry->isWeak = raftMetaArr[i].isWeak;
|
||||
pEntry->term = term;
|
||||
pEntry->index = index;
|
||||
memcpy(pEntry->data, msgArr[i].pCont, msgArr[i].contLen);
|
||||
ASSERT(msgArr[i].contLen == pEntry->dataLen);
|
||||
|
||||
code = ths->pLogStore->syncLogAppendEntry(ths->pLogStore, pEntry);
|
||||
if (code != 0) {
|
||||
// del resp mgr, call FpCommitCb
|
||||
ASSERT(0);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
// fsync once
|
||||
SSyncLogStoreData* pData = ths->pLogStore->data;
|
||||
SWal* pWal = pData->pWal;
|
||||
walFsync(pWal, true);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void syncFreeNode(void* param) {
|
||||
SSyncNode* pNode = param;
|
||||
// inner object already free
|
||||
|
|
|
@ -591,7 +591,7 @@ void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLe
|
|||
void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncPingReply) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len) {
|
||||
|
@ -956,6 +956,48 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncClientRequestBatch----
|
||||
|
||||
// block1:
|
||||
// block2: SRaftMeta array
|
||||
// block3: rpc msg array (with pCont)
|
||||
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
int32_t vgId) {
|
||||
ASSERT(rpcMsgArr != NULL);
|
||||
ASSERT(arrSize > 0);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
int32_t raftMetaArrayLen = sizeof(SRaftMeta) * arrSize;
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize;
|
||||
dataLen += (raftMetaArrayLen + rpcArrayLen);
|
||||
|
||||
uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen;
|
||||
SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH;
|
||||
pMsg->dataCount = arrSize;
|
||||
pMsg->dataLen = dataLen;
|
||||
|
||||
SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
|
||||
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
// init raftMetaArr
|
||||
raftMetaArr[i].isWeak = raftArr[i].isWeak;
|
||||
raftMetaArr[i].seqNum = raftArr[i].seqNum;
|
||||
|
||||
// init msgArr
|
||||
msgArr[i] = rpcMsgArr[i];
|
||||
}
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg) {}
|
||||
|
||||
// ---- message process SyncRequestVote----
|
||||
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncRequestVote);
|
||||
|
@ -1426,6 +1468,279 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncAppendEntriesBatch----
|
||||
|
||||
// block1: SOffsetAndContLen
|
||||
// block2: SOffsetAndContLen Array
|
||||
// block3: SRpcMsg Array
|
||||
// block4: SRpcMsg pCont Array
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SRpcMsg* rpcMsgArr, int32_t arrSize, int32_t vgId) {
|
||||
ASSERT(rpcMsgArr != NULL);
|
||||
ASSERT(arrSize > 0);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * arrSize; // <offset, contLen>
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * arrSize; // SRpcMsg
|
||||
int32_t contArrayLen = 0;
|
||||
for (int i = 0; i < arrSize; ++i) { // SRpcMsg pCont
|
||||
contArrayLen += rpcMsgArr[i].contLen;
|
||||
}
|
||||
dataLen += (metaArrayLen + rpcArrayLen + contArrayLen);
|
||||
|
||||
uint32_t bytes = sizeof(SyncAppendEntriesBatch) + dataLen;
|
||||
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_APPEND_ENTRIES_BATCH;
|
||||
pMsg->dataCount = arrSize;
|
||||
pMsg->dataLen = dataLen;
|
||||
|
||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + metaArrayLen);
|
||||
char* pData = pMsg->data;
|
||||
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
// init <offset, contLen>
|
||||
if (i == 0) {
|
||||
metaArr[i].offset = metaArrayLen + rpcArrayLen;
|
||||
metaArr[i].contLen = rpcMsgArr[i].contLen;
|
||||
} else {
|
||||
metaArr[i].offset = metaArr[i - 1].offset + metaArr[i - 1].contLen;
|
||||
metaArr[i].contLen = rpcMsgArr[i].contLen;
|
||||
}
|
||||
|
||||
// init msgArr
|
||||
msgArr[i] = rpcMsgArr[i];
|
||||
|
||||
// init data
|
||||
ASSERT(rpcMsgArr[i].contLen == metaArr[i].contLen);
|
||||
memcpy(pData + metaArr[i].offset, rpcMsgArr[i].pCont, rpcMsgArr[i].contLen);
|
||||
}
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg) {
|
||||
if (pMsg != NULL) {
|
||||
taosMemoryFree(pMsg);
|
||||
}
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen) {
|
||||
ASSERT(pMsg->bytes <= bufLen);
|
||||
memcpy(buf, pMsg, pMsg->bytes);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg) {
|
||||
memcpy(pMsg, buf, len);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
ASSERT(pMsg->bytes == sizeof(SyncAppendEntriesBatch) + pMsg->dataLen);
|
||||
}
|
||||
|
||||
char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len) {
|
||||
char* buf = taosMemoryMalloc(pMsg->bytes);
|
||||
ASSERT(buf != NULL);
|
||||
syncAppendEntriesBatchSerialize(pMsg, buf, pMsg->bytes);
|
||||
if (len != NULL) {
|
||||
*len = pMsg->bytes;
|
||||
}
|
||||
return buf;
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len) {
|
||||
uint32_t bytes = *((uint32_t*)buf);
|
||||
SyncAppendEntriesBatch* pMsg = taosMemoryMalloc(bytes);
|
||||
ASSERT(pMsg != NULL);
|
||||
syncAppendEntriesBatchDeserialize(buf, len, pMsg);
|
||||
ASSERT(len == pMsg->bytes);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg) {
|
||||
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
|
||||
pRpcMsg->msgType = pMsg->msgType;
|
||||
pRpcMsg->contLen = pMsg->bytes;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
syncAppendEntriesBatchSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg) {
|
||||
syncAppendEntriesBatchDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg) {
|
||||
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
|
||||
ASSERT(pMsg != NULL);
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
cJSON* syncAppendEntriesBatch2Json(const SyncAppendEntriesBatch* pMsg) {
|
||||
char u64buf[128] = {0};
|
||||
cJSON* pRoot = cJSON_CreateObject();
|
||||
|
||||
if (pMsg != NULL) {
|
||||
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
|
||||
cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
|
||||
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
|
||||
|
||||
cJSON* pSrcId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->srcId.addr);
|
||||
cJSON_AddStringToObject(pSrcId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->srcId.addr;
|
||||
cJSON* pTmp = pSrcId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
|
||||
|
||||
cJSON* pDestId = cJSON_CreateObject();
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->destId.addr);
|
||||
cJSON_AddStringToObject(pDestId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pMsg->destId.addr;
|
||||
cJSON* pTmp = pDestId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
cJSON_AddStringToObject(pTmp, "addr_host", host);
|
||||
cJSON_AddNumberToObject(pTmp, "addr_port", port);
|
||||
}
|
||||
cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
|
||||
cJSON_AddItemToObject(pRoot, "destId", pDestId);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->term);
|
||||
cJSON_AddStringToObject(pRoot, "term", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->prevLogIndex);
|
||||
cJSON_AddStringToObject(pRoot, "prevLogIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->prevLogTerm);
|
||||
cJSON_AddStringToObject(pRoot, "prevLogTerm", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%ld", pMsg->commitIndex);
|
||||
cJSON_AddStringToObject(pRoot, "commitIndex", u64buf);
|
||||
|
||||
snprintf(u64buf, sizeof(u64buf), "%lu", pMsg->privateTerm);
|
||||
cJSON_AddStringToObject(pRoot, "privateTerm", u64buf);
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "dataCount", pMsg->dataCount);
|
||||
cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
|
||||
|
||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pMsg->dataCount; // <offset, contLen>
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * pMsg->dataCount; // SRpcMsg
|
||||
int32_t contArrayLen = pMsg->dataLen - metaArrayLen - rpcArrayLen;
|
||||
|
||||
cJSON_AddNumberToObject(pRoot, "metaArrayLen", metaArrayLen);
|
||||
cJSON_AddNumberToObject(pRoot, "rpcArrayLen", rpcArrayLen);
|
||||
cJSON_AddNumberToObject(pRoot, "contArrayLen", contArrayLen);
|
||||
|
||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)(pMsg->data + metaArrayLen);
|
||||
void* pData = (void*)(pMsg->data + metaArrayLen + rpcArrayLen);
|
||||
|
||||
cJSON* pMetaArr = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "metaArr", pMetaArr);
|
||||
for (int i = 0; i < pMsg->dataCount; ++i) {
|
||||
cJSON* pMeta = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pMeta, "offset", metaArr[i].offset);
|
||||
cJSON_AddNumberToObject(pMeta, "contLen", metaArr[i].contLen);
|
||||
cJSON_AddItemToArray(pMetaArr, pMeta);
|
||||
}
|
||||
|
||||
cJSON* pMsgArr = cJSON_CreateArray();
|
||||
cJSON_AddItemToObject(pRoot, "msgArr", pMsgArr);
|
||||
for (int i = 0; i < pMsg->dataCount; ++i) {
|
||||
cJSON* pRpcMsgJson = cJSON_CreateObject();
|
||||
cJSON_AddNumberToObject(pRpcMsgJson, "code", msgArr[i].code);
|
||||
cJSON_AddNumberToObject(pRpcMsgJson, "contLen", msgArr[i].contLen);
|
||||
cJSON_AddNumberToObject(pRpcMsgJson, "msgType", msgArr[i].msgType);
|
||||
cJSON_AddItemToArray(pMsgArr, pRpcMsgJson);
|
||||
}
|
||||
|
||||
char* s;
|
||||
s = syncUtilprintBin((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data", s);
|
||||
taosMemoryFree(s);
|
||||
s = syncUtilprintBin2((char*)(pMsg->data), pMsg->dataLen);
|
||||
cJSON_AddStringToObject(pRoot, "data2", s);
|
||||
taosMemoryFree(s);
|
||||
}
|
||||
|
||||
cJSON* pJson = cJSON_CreateObject();
|
||||
cJSON_AddItemToObject(pJson, "SyncAppendEntriesBatch", pRoot);
|
||||
return pJson;
|
||||
}
|
||||
|
||||
char* syncAppendEntriesBatch2Str(const SyncAppendEntriesBatch* pMsg) {
|
||||
cJSON* pJson = syncAppendEntriesBatch2Json(pMsg);
|
||||
char* serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatch2RpcMsgArray(SyncAppendEntriesBatch* pSyncMsg, SRpcMsg* rpcMsgArr, int32_t maxArrSize,
|
||||
int32_t* pRetArrSize) {
|
||||
if (pRetArrSize != NULL) {
|
||||
*pRetArrSize = pSyncMsg->dataCount;
|
||||
}
|
||||
|
||||
int32_t arrSize = pSyncMsg->dataCount;
|
||||
if (arrSize > maxArrSize) {
|
||||
arrSize = maxArrSize;
|
||||
}
|
||||
|
||||
int32_t metaArrayLen = sizeof(SOffsetAndContLen) * pSyncMsg->dataCount; // <offset, contLen>
|
||||
int32_t rpcArrayLen = sizeof(SRpcMsg) * pSyncMsg->dataCount; // SRpcMsg
|
||||
int32_t contArrayLen = pSyncMsg->dataLen - metaArrayLen - rpcArrayLen;
|
||||
|
||||
SOffsetAndContLen* metaArr = (SOffsetAndContLen*)(pSyncMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)(pSyncMsg->data + metaArrayLen);
|
||||
void* pData = pSyncMsg->data + metaArrayLen + rpcArrayLen;
|
||||
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
rpcMsgArr[i] = msgArr[i];
|
||||
rpcMsgArr[i].pCont = rpcMallocCont(msgArr[i].contLen);
|
||||
void* pRpcCont = pSyncMsg->data + metaArr[i].offset;
|
||||
memcpy(rpcMsgArr[i].pCont, pRpcCont, rpcMsgArr[i].contLen);
|
||||
}
|
||||
}
|
||||
|
||||
// for debug ----------------------
|
||||
void syncAppendEntriesBatchPrint(const SyncAppendEntriesBatch* pMsg) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
printf("syncAppendEntriesBatchPrint | len:%lu | %s \n", strlen(serialized), serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchPrint2(char* s, const SyncAppendEntriesBatch* pMsg) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
printf("syncAppendEntriesBatchPrint2 | len:%lu | %s | %s \n", strlen(serialized), s, serialized);
|
||||
fflush(NULL);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchLog(const SyncAppendEntriesBatch* pMsg) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
sTrace("syncAppendEntriesBatchLog | len:%lu | %s", strlen(serialized), serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
|
||||
void syncAppendEntriesBatchLog2(char* s, const SyncAppendEntriesBatch* pMsg) {
|
||||
if (gRaftDetailLog) {
|
||||
char* serialized = syncAppendEntriesBatch2Str(pMsg);
|
||||
sTraceLong("syncAppendEntriesBatchLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
|
||||
taosMemoryFree(serialized);
|
||||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncAppendEntriesReply----
|
||||
SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncAppendEntriesReply);
|
||||
|
|
|
@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
|
|||
|
||||
char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
||||
cJSON *pJson = syncCfg2Json(pSyncCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
|
|||
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
|
||||
if (pSyncCfg != NULL) {
|
||||
int32_t len = 512;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
memset(s, 0, len);
|
||||
|
||||
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
|
||||
|
@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
|
|||
|
||||
char *raftCfg2Str(SRaftCfg *pRaftCfg) {
|
||||
cJSON *pJson = raftCfg2Json(pRaftCfg);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
|
|||
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
|
||||
}
|
||||
|
||||
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
|
||||
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
|
||||
ASSERT(code == 0);
|
||||
|
||||
|
|
|
@ -116,6 +116,73 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeAppendEntriesPeersSnapshot2(SSyncNode* pSyncNode) {
|
||||
if (pSyncNode->state != TAOS_SYNC_STATE_LEADER) {
|
||||
return -1;
|
||||
}
|
||||
|
||||
int32_t ret = 0;
|
||||
for (int i = 0; i < pSyncNode->peersNum; ++i) {
|
||||
SRaftId* pDestId = &(pSyncNode->peersId[i]);
|
||||
|
||||
// next index
|
||||
SyncIndex nextIndex = syncIndexMgrGetIndex(pSyncNode->pNextIndex, pDestId);
|
||||
|
||||
// pre index, pre term
|
||||
SyncIndex preLogIndex = syncNodeGetPreIndex(pSyncNode, nextIndex);
|
||||
SyncTerm preLogTerm = syncNodeGetPreTerm(pSyncNode, nextIndex);
|
||||
if (preLogTerm == SYNC_TERM_INVALID) {
|
||||
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(pSyncNode, pDestId);
|
||||
ASSERT(pSender != NULL);
|
||||
ASSERT(!snapshotSenderIsStart(pSender));
|
||||
|
||||
SyncIndex newNextIndex = syncNodeGetLastIndex(pSyncNode) + 1;
|
||||
syncIndexMgrSetIndex(pSyncNode->pNextIndex, pDestId, newNextIndex);
|
||||
syncIndexMgrSetIndex(pSyncNode->pMatchIndex, pDestId, SYNC_INDEX_INVALID);
|
||||
sError("vgId:%d sync get pre term error, nextIndex:%ld, update next-index:%ld, match-index:%d, raftid:%ld",
|
||||
pSyncNode->vgId, nextIndex, newNextIndex, SYNC_INDEX_INVALID, pDestId->addr);
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
SRpcMsg rpcMsgArr[SYNC_MAX_BATCH_SIZE];
|
||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
||||
|
||||
int32_t getCount = 0;
|
||||
for (int32_t i = 0; i < pSyncNode->batchSize; ++i) {
|
||||
SSyncRaftEntry* pEntry;
|
||||
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, nextIndex, &pEntry);
|
||||
if (code == 0) {
|
||||
ASSERT(pEntry != NULL);
|
||||
// get rpc msg [i] from entry
|
||||
syncEntryDestory(pEntry);
|
||||
getCount++;
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch* pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, getCount, pSyncNode->vgId);
|
||||
ASSERT(pMsg != NULL);
|
||||
|
||||
// prepare msg
|
||||
pMsg->srcId = pSyncNode->myRaftId;
|
||||
pMsg->destId = *pDestId;
|
||||
pMsg->term = pSyncNode->pRaftStore->currentTerm;
|
||||
pMsg->prevLogIndex = preLogIndex;
|
||||
pMsg->prevLogTerm = preLogTerm;
|
||||
pMsg->commitIndex = pSyncNode->commitIndex;
|
||||
pMsg->privateTerm = 0;
|
||||
pMsg->dataCount = getCount;
|
||||
|
||||
// send msg
|
||||
syncNodeAppendEntriesBatch(pSyncNode, pDestId, pMsg);
|
||||
syncAppendEntriesBatchDestroy(pMsg);
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t syncNodeAppendEntriesPeersSnapshot(SSyncNode* pSyncNode) {
|
||||
ASSERT(pSyncNode->state == TAOS_SYNC_STATE_LEADER);
|
||||
|
||||
|
@ -234,4 +301,24 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
|
|||
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
|
||||
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
||||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncNodeAppendEntriesBatch(SSyncNode* pSyncNode, const SRaftId* destRaftId,
|
||||
const SyncAppendEntriesBatch* pMsg) {
|
||||
do {
|
||||
char host[128];
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(destRaftId->addr, host, sizeof(host), &port);
|
||||
sDebug(
|
||||
"vgId:%d, send sync-append-entries-batch to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, "
|
||||
"commit:%ld, "
|
||||
"datalen:%d, dataCount:%d}",
|
||||
pSyncNode->vgId, host, port, pMsg->term, pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->privateTerm,
|
||||
pMsg->commitIndex, pMsg->dataLen, pMsg->dataCount);
|
||||
} while (0);
|
||||
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntriesBatch2RpcMsg(pMsg, &rpcMsg);
|
||||
syncNodeSendMsgById(destRaftId, pSyncNode, &rpcMsg);
|
||||
return 0;
|
||||
}
|
|
@ -349,14 +349,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
|
|||
|
||||
char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
|
||||
cJSON *pJson = snapshotSender2Json(pSender);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
|
||||
int32_t len = 256;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
|
||||
char host[64];
|
||||
|
@ -604,7 +604,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
cJSON_AddStringToObject(pFromId, "addr", u64buf);
|
||||
{
|
||||
uint64_t u64 = pReceiver->fromId.addr;
|
||||
cJSON *pTmp = pFromId;
|
||||
cJSON * pTmp = pFromId;
|
||||
char host[128] = {0};
|
||||
uint16_t port;
|
||||
syncUtilU642Addr(u64, host, sizeof(host), &port);
|
||||
|
@ -637,14 +637,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
|
|||
|
||||
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
|
||||
cJSON *pJson = snapshotReceiver2Json(pReceiver);
|
||||
char *serialized = cJSON_Print(pJson);
|
||||
char * serialized = cJSON_Print(pJson);
|
||||
cJSON_Delete(pJson);
|
||||
return serialized;
|
||||
}
|
||||
|
||||
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
|
||||
int32_t len = 256;
|
||||
char *s = taosMemoryMalloc(len);
|
||||
char * s = taosMemoryMalloc(len);
|
||||
|
||||
SRaftId fromId = pReceiver->fromId;
|
||||
char host[128];
|
||||
|
|
|
@ -21,6 +21,7 @@ add_executable(syncEntryCacheTest "")
|
|||
add_executable(syncRequestVoteTest "")
|
||||
add_executable(syncRequestVoteReplyTest "")
|
||||
add_executable(syncAppendEntriesTest "")
|
||||
add_executable(syncAppendEntriesBatchTest "")
|
||||
add_executable(syncAppendEntriesReplyTest "")
|
||||
add_executable(syncClientRequestTest "")
|
||||
add_executable(syncTimeoutTest "")
|
||||
|
@ -146,6 +147,10 @@ target_sources(syncAppendEntriesTest
|
|||
PRIVATE
|
||||
"syncAppendEntriesTest.cpp"
|
||||
)
|
||||
target_sources(syncAppendEntriesBatchTest
|
||||
PRIVATE
|
||||
"syncAppendEntriesBatchTest.cpp"
|
||||
)
|
||||
target_sources(syncAppendEntriesReplyTest
|
||||
PRIVATE
|
||||
"syncAppendEntriesReplyTest.cpp"
|
||||
|
@ -387,6 +392,11 @@ target_include_directories(syncAppendEntriesTest
|
|||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncAppendEntriesBatchTest
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
"${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||
)
|
||||
target_include_directories(syncAppendEntriesReplyTest
|
||||
PUBLIC
|
||||
"${TD_SOURCE_DIR}/include/libs/sync"
|
||||
|
@ -636,6 +646,10 @@ target_link_libraries(syncAppendEntriesTest
|
|||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncAppendEntriesBatchTest
|
||||
sync
|
||||
gtest_main
|
||||
)
|
||||
target_link_libraries(syncAppendEntriesReplyTest
|
||||
sync
|
||||
gtest_main
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
//#include <gtest/gtest.h>
|
||||
#include <stdio.h>
|
||||
#include "syncIO.h"
|
||||
#include "syncInt.h"
|
||||
#include "syncMessage.h"
|
||||
#include "syncUtil.h"
|
||||
#include "trpc.h"
|
||||
|
||||
void logTest() {
|
||||
sTrace("--- sync log test: trace");
|
||||
sDebug("--- sync log test: debug");
|
||||
sInfo("--- sync log test: info");
|
||||
sWarn("--- sync log test: warn");
|
||||
sError("--- sync log test: error");
|
||||
sFatal("--- sync log test: fatal");
|
||||
}
|
||||
|
||||
SRpcMsg *createRpcMsg(int32_t i, int32_t dataLen) {
|
||||
SRpcMsg *pRpcMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
|
||||
memset(pRpcMsg, 0, sizeof(SRpcMsg));
|
||||
|
||||
pRpcMsg->msgType = TDMT_SYNC_PING;
|
||||
pRpcMsg->contLen = dataLen;
|
||||
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
|
||||
pRpcMsg->code = 10 * i;
|
||||
snprintf((char *)pRpcMsg->pCont, pRpcMsg->contLen, "value_%d", i);
|
||||
|
||||
return pRpcMsg;
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch *createMsg() {
|
||||
SRpcMsg rpcMsgArr[5];
|
||||
memset(rpcMsgArr, 0, sizeof(rpcMsgArr));
|
||||
|
||||
for (int32_t i = 0; i < 5; ++i) {
|
||||
SRpcMsg *pRpcMsg = createRpcMsg(i, 20);
|
||||
rpcMsgArr[i] = *pRpcMsg;
|
||||
taosMemoryFree(pRpcMsg);
|
||||
}
|
||||
|
||||
SyncAppendEntriesBatch *pMsg = syncAppendEntriesBatchBuild(rpcMsgArr, 5, 1234);
|
||||
pMsg->srcId.addr = syncUtilAddr2U64("127.0.0.1", 1234);
|
||||
pMsg->srcId.vgId = 100;
|
||||
pMsg->destId.addr = syncUtilAddr2U64("127.0.0.1", 5678);
|
||||
pMsg->destId.vgId = 100;
|
||||
pMsg->prevLogIndex = 11;
|
||||
pMsg->prevLogTerm = 22;
|
||||
pMsg->commitIndex = 33;
|
||||
pMsg->privateTerm = 44;
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
void test1() {
|
||||
SyncAppendEntriesBatch *pMsg = createMsg();
|
||||
syncAppendEntriesBatchLog2((char *)"test1:", pMsg);
|
||||
|
||||
SRpcMsg rpcMsgArr[5];
|
||||
int32_t retArrSize;
|
||||
syncAppendEntriesBatch2RpcMsgArray(pMsg, rpcMsgArr, 5, &retArrSize);
|
||||
for (int i = 0; i < retArrSize; ++i) {
|
||||
char logBuf[128];
|
||||
snprintf(logBuf, sizeof(logBuf), "==test1 decode rpc msg %d: msgType:%d, code:%d, contLen:%d, pCont:%s \n", i,
|
||||
rpcMsgArr[i].msgType, rpcMsgArr[i].code, rpcMsgArr[i].contLen, (char *)rpcMsgArr[i].pCont);
|
||||
sTrace("%s", logBuf);
|
||||
}
|
||||
|
||||
syncAppendEntriesBatchDestroy(pMsg);
|
||||
}
|
||||
|
||||
/*
|
||||
void test2() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
uint32_t len = pMsg->bytes;
|
||||
char * serialized = (char *)taosMemoryMalloc(len);
|
||||
syncAppendEntriesSerialize(pMsg, serialized, len);
|
||||
SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen, 1000);
|
||||
syncAppendEntriesDeserialize(serialized, len, pMsg2);
|
||||
syncAppendEntriesLog2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test3() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
uint32_t len;
|
||||
char * serialized = syncAppendEntriesSerialize2(pMsg, &len);
|
||||
SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len);
|
||||
syncAppendEntriesLog2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2);
|
||||
|
||||
taosMemoryFree(serialized);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test4() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncAppendEntries *pMsg2 = (SyncAppendEntries *)taosMemoryMalloc(rpcMsg.contLen);
|
||||
syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2);
|
||||
syncAppendEntriesLog2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
|
||||
void test5() {
|
||||
SyncAppendEntries *pMsg = createMsg();
|
||||
SRpcMsg rpcMsg;
|
||||
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
|
||||
SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg);
|
||||
syncAppendEntriesLog2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2);
|
||||
|
||||
rpcFreeCont(rpcMsg.pCont);
|
||||
syncAppendEntriesDestroy(pMsg);
|
||||
syncAppendEntriesDestroy(pMsg2);
|
||||
}
|
||||
*/
|
||||
|
||||
int main() {
|
||||
gRaftDetailLog = true;
|
||||
tsAsyncLog = 0;
|
||||
sDebugFlag = DEBUG_DEBUG + DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE;
|
||||
logTest();
|
||||
|
||||
test1();
|
||||
|
||||
/*
|
||||
test2();
|
||||
test3();
|
||||
test4();
|
||||
test5();
|
||||
*/
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) {
|
||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
|
||||
*ppReader = (void*)0xABCD;
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
|
||||
|
|
|
@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
|
|||
|
||||
int32_t GetSnapshot(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) { return 0; }
|
||||
|
||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; }
|
||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) { return 0; }
|
||||
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
|
||||
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
|
||||
|
||||
|
|
|
@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) {
|
||||
int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void *pParam, void** ppReader) {
|
||||
*ppReader = (void*)0xABCD;
|
||||
char logBuf[256] = {0};
|
||||
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p", pFsm, *ppReader);
|
||||
|
|
|
@ -429,6 +429,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_RECONFIG_NOT_READY, "Sync not ready for reconfig")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for propose")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_STANDBY_NOT_READY, "Sync not ready for standby")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_BATCH_ERROR, "Sync batch error")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_SYN_INTERNAL_ERROR, "Sync internal error")
|
||||
|
||||
// wal
|
||||
|
|
Loading…
Reference in New Issue