Merge pull request #14604 from taosdata/feature/3.0_mhli

refactor(sync): add term in rpcMsg
This commit is contained in:
Li Minghao 2022-07-06 17:35:07 +08:00 committed by GitHub
commit 767d21fa81
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 172 additions and 105 deletions

View File

@ -215,6 +215,7 @@ int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, in
bool syncEnvIsStart(); bool syncEnvIsStart();
const char* syncStr(ESyncState state); const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid); bool syncIsRestoreFinish(int64_t rid);
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg); int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg);

View File

@ -27,7 +27,7 @@ extern "C" {
#define TAOS_CONN_SERVER 0 #define TAOS_CONN_SERVER 0
#define TAOS_CONN_CLIENT 1 #define TAOS_CONN_CLIENT 1
#define IsReq(pMsg) (pMsg->msgType & 1U) #define IsReq(pMsg) (pMsg->msgType & 1U)
extern int32_t tsRpcHeadSize; extern int32_t tsRpcHeadSize;
@ -35,12 +35,13 @@ typedef struct {
uint32_t clientIp; uint32_t clientIp;
uint16_t clientPort; uint16_t clientPort;
int64_t applyIndex; int64_t applyIndex;
uint64_t applyTerm;
char user[TSDB_USER_LEN]; char user[TSDB_USER_LEN];
} SRpcConnInfo; } SRpcConnInfo;
typedef struct SRpcHandleInfo { typedef struct SRpcHandleInfo {
// rpc info // rpc info
void * handle; // rpc handle returned to app void *handle; // rpc handle returned to app
int64_t refId; // refid, used by server int64_t refId; // refid, used by server
int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp); int32_t noResp; // has response or not(default 0, 0: resp, 1: no resp);
int32_t persistHandle; // persist handle or not int32_t persistHandle; // persist handle or not
@ -53,7 +54,7 @@ typedef struct SRpcHandleInfo {
void *node; // node mgmt handle void *node; // node mgmt handle
// resp info // resp info
void * rsp; void *rsp;
int32_t rspLen; int32_t rspLen;
// conn info // conn info
@ -62,7 +63,7 @@ typedef struct SRpcHandleInfo {
typedef struct SRpcMsg { typedef struct SRpcMsg {
tmsg_t msgType; tmsg_t msgType;
void * pCont; void *pCont;
int32_t contLen; int32_t contLen;
int32_t code; int32_t code;
SRpcHandleInfo info; SRpcHandleInfo info;
@ -74,7 +75,7 @@ typedef bool (*RpcRfp)(int32_t code, tmsg_t msgType);
typedef struct SRpcInit { typedef struct SRpcInit {
char localFqdn[TSDB_FQDN_LEN]; char localFqdn[TSDB_FQDN_LEN];
uint16_t localPort; // local port uint16_t localPort; // local port
char * label; // for debug purpose char *label; // for debug purpose
int32_t numOfThreads; // number of threads to handle connections int32_t numOfThreads; // number of threads to handle connections
int32_t sessions; // number of sessions allowed int32_t sessions; // number of sessions allowed
int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS int8_t connType; // TAOS_CONN_UDP, TAOS_CONN_TCPC, TAOS_CONN_TCPS
@ -99,12 +100,12 @@ typedef struct {
typedef struct { typedef struct {
int32_t msgType; int32_t msgType;
void * val; void *val;
int32_t (*clone)(void *src, void **dst); int32_t (*clone)(void *src, void **dst);
} SRpcBrokenlinkVal; } SRpcBrokenlinkVal;
typedef struct { typedef struct {
SHashObj * args; SHashObj *args;
SRpcBrokenlinkVal brokenVal; SRpcBrokenlinkVal brokenVal;
void (*freeFunc)(const void *arg); void (*freeFunc)(const void *arg);
} SRpcCtx; } SRpcCtx;

View File

@ -234,17 +234,14 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
} }
#if 1 #if 1
char *syncNodeStr = sync2SimpleStr(pVnode->sync); do {
static int64_t vndTick = 0; char *syncNodeStr = sync2SimpleStr(pVnode->sync);
if (++vndTick % 10 == 1) { static int64_t vndTick = 0;
vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr); if (++vndTick % 10 == 1) {
} vGTrace("vgId:%d, sync trace msg:%s, %s", syncGetVgId(pVnode->sync), TMSG_INFO(pMsg->msgType), syncNodeStr);
if (gRaftDetailLog) { }
char logBuf[512] = {0}; taosMemoryFree(syncNodeStr);
snprintf(logBuf, sizeof(logBuf), "vnode process syncmsg, msgType:%d, syncNode:%s", pMsg->msgType, syncNodeStr); } while (0);
syncRpcMsgLog2(logBuf, pMsg);
}
taosMemoryFree(syncNodeStr);
#endif #endif
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
@ -297,7 +294,8 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg->msgType);
code = -1; code = -1;
} }
} else {
} else if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_WAL_FIRST) {
// use wal first strategy // use wal first strategy
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
@ -403,43 +401,53 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
} }
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
SVnode *pVnode = pFsm->data; SVnode *pVnode = pFsm->data;
SSnapshot snapshot = {0}; vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
SyncIndex beginIndex = SYNC_INDEX_INVALID; syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
char logBuf[256] = {0}; syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
snprintf(logBuf, sizeof(logBuf),
"commitCb execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", pFsm,
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index; rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} }
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; SVnode *pVnode = pFsm->data;
snprintf(logBuf, sizeof(logBuf), "preCommitCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
} }
static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
char logBuf[256] = {0}; SVnode *pVnode = pFsm->data;
snprintf(logBuf, sizeof(logBuf), "rollBackCb== pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s \n", pFsm, vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state)); syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
} }
static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) { return 0; } static int32_t vnodeSnapshotStartRead(struct SSyncFSM *pFsm, void *pParam, void **ppReader) {
SVnode *pVnode = pFsm->data;
SSnapshotParam *pSnapshotParam = pParam;
int32_t code =
vnodeSnapshotReaderOpen(pVnode, (SVSnapshotReader **)ppReader, pSnapshotParam->start, pSnapshotParam->end);
return code;
}
static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) { return 0; } static int32_t vnodeSnapshotStopRead(struct SSyncFSM *pFsm, void *pReader) {
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapshotReaderClose(pReader);
return code;
}
static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) { return 0; } static int32_t vnodeSnapshotDoRead(struct SSyncFSM *pFsm, void *pReader, void **ppBuf, int32_t *len) {
SVnode *pVnode = pFsm->data;
int32_t code = vnodeSnapshotRead(pReader, (const void **)ppBuf, len);
return code;
}
static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; } static int32_t vnodeSnapshotStartWrite(struct SSyncFSM *pFsm, void *pParam, void **ppWriter) { return 0; }

View File

@ -36,6 +36,7 @@ typedef struct SRaftCfg {
TdFilePtr pFile; TdFilePtr pFile;
char path[TSDB_FILENAME_LEN * 2]; char path[TSDB_FILENAME_LEN * 2];
int8_t isStandBy; int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy; int8_t snapshotStrategy;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
@ -49,19 +50,20 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg); char * syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg); char * raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);
typedef struct SRaftCfgMeta { typedef struct SRaftCfgMeta {
int8_t isStandBy; int8_t isStandBy;
int32_t batchSize;
int8_t snapshotStrategy; int8_t snapshotStrategy;
SyncIndex lastConfigIndex; SyncIndex lastConfigIndex;
} SRaftCfgMeta; } SRaftCfgMeta;

View File

@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void *pReader; void * pReader;
void *pCurrentBlock; void * pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SSyncCfg lastConfig; SSyncCfg lastConfig;
int64_t sendingMS; int64_t sendingMS;
SSyncNode *pSyncNode; SSyncNode * pSyncNode;
int32_t replicaIndex; int32_t replicaIndex;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
@ -64,20 +64,20 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender); char * snapshotSender2Str(SSyncSnapshotSender *pSender);
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
//--------------------------------------------------- //---------------------------------------------------
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void *pWriter; void * pWriter;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SRaftId fromId; SRaftId fromId;
SSyncNode *pSyncNode; SSyncNode * pSyncNode;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
@ -88,8 +88,8 @@ int32_t snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver);
bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver); bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//--------------------------------------------------- //---------------------------------------------------
// on message // on message

View File

@ -834,7 +834,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
// //
// operation: // operation:
// if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry // if hasAppendEntries && pMsg->prevLogIndex == ths->commitIndex, append entry
// match my-commit-index or my-commit-index + 1 // match my-commit-index or my-commit-index + batchSize
do { do {
bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) && bool condition = (pMsg->term == ths->pRaftStore->currentTerm) && (ths->state == TAOS_SYNC_STATE_FOLLOWER) &&
(pMsg->prevLogIndex <= ths->commitIndex); (pMsg->prevLogIndex <= ths->commitIndex);
@ -928,11 +928,13 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc
bool condition = condition1 || condition2; bool condition = condition1 || condition2;
if (condition) { if (condition) {
char logBuf[128]; do {
snprintf(logBuf, sizeof(logBuf), char logBuf[128];
"recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d", pMsg->prevLogIndex, snprintf(logBuf, sizeof(logBuf),
pMsg->prevLogTerm, pMsg->dataLen); "recv sync-append-entries-batch, not match, pre-index:%ld, pre-term:%lu, datalen:%d",
syncNodeEventLog(ths, logBuf); pMsg->prevLogIndex, pMsg->prevLogTerm, pMsg->dataLen);
syncNodeEventLog(ths, logBuf);
} while (0);
// prepare response msg // prepare response msg
SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId); SyncAppendEntriesReply* pReply = syncAppendEntriesReplyBuild(ths->vgId);

View File

@ -109,19 +109,30 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
} }
// only start once // only start once
static void syncNodeStartSnapshot(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm, static void syncNodeStartSnapshotOnce(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, SyncTerm lastApplyTerm,
SyncAppendEntriesReply* pMsg) { SyncAppendEntriesReply* pMsg) {
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));
ASSERT(pSender != NULL); ASSERT(pSender != NULL);
if (snapshotSenderIsStart(pSender)) {
do {
char* eventLog = snapshotSender2SimpleStr(pSender, "snapshot sender already start");
syncNodeErrorLog(ths, eventLog);
taosMemoryFree(eventLog);
} while (0);
return;
}
SSnapshot snapshot = { SSnapshot snapshot = {
.data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID}; .data = NULL, .lastApplyIndex = endIndex, .lastApplyTerm = lastApplyTerm, .lastConfigIndex = SYNC_INDEX_INVALID};
void* pReader = NULL; void* pReader = NULL;
SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex}; SSnapshotParam readerParam = {.start = beginIndex, .end = endIndex};
ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader); int32_t code = ths->pFsm->FpSnapshotStartRead(ths->pFsm, &readerParam, &pReader);
if (!snapshotSenderIsStart(pSender) && pMsg->privateTerm < pSender->privateTerm) { ASSERT(code == 0);
if (pMsg->privateTerm < pSender->privateTerm) {
ASSERT(pReader != NULL); ASSERT(pReader != NULL);
snapshotSenderStart(pSender, readerParam, snapshot, pReader); snapshotSenderStart(pSender, readerParam, snapshot, pReader);
@ -178,7 +189,9 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
// start snapshot <match+1, old snapshot.end> // start snapshot <match+1, old snapshot.end>
SSnapshot oldSnapshot; SSnapshot oldSnapshot;
ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot); ths->pFsm->FpGetSnapshotInfo(ths->pFsm, &oldSnapshot);
syncNodeStartSnapshot(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm, pMsg); ASSERT(oldSnapshot.lastApplyIndex >= newMatchIndex + 1);
syncNodeStartSnapshotOnce(ths, newMatchIndex + 1, oldSnapshot.lastApplyIndex, oldSnapshot.lastApplyTerm,
pMsg); // term maybe not ok?
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1); syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), oldSnapshot.lastApplyIndex + 1);
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex); syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), newMatchIndex);
@ -187,7 +200,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
} else { } else {
SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId)); SyncIndex nextIndex = syncIndexMgrGetIndex(ths->pNextIndex, &(pMsg->srcId));
// notice! int64, uint64
if (nextIndex > SYNC_INDEX_BEGIN) { if (nextIndex > SYNC_INDEX_BEGIN) {
--nextIndex; --nextIndex;
@ -198,7 +210,7 @@ int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntrie
SSyncRaftEntry* pEntry; SSyncRaftEntry* pEntry;
int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry); int32_t code = ths->pLogStore->syncLogGetEntry(ths->pLogStore, nextIndex, &pEntry);
ASSERT(code == 0); ASSERT(code == 0);
syncNodeStartSnapshot(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg); syncNodeStartSnapshotOnce(ths, SYNC_INDEX_BEGIN, nextIndex, pEntry->term, pMsg);
// get sender // get sender
SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId)); SSyncSnapshotSender* pSender = syncNodeGetSnapshotSender(ths, &(pMsg->srcId));

View File

@ -397,6 +397,38 @@ bool syncIsRestoreFinish(int64_t rid) {
return b; return b;
} }
int32_t syncGetSnapshotByIndex(int64_t rid, SyncIndex index, SSnapshot* pSnapshot) {
if (index < SYNC_INDEX_BEGIN) {
return -1;
}
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) {
return -1;
}
ASSERT(rid == pSyncNode->rid);
SSyncRaftEntry* pEntry = NULL;
int32_t code = pSyncNode->pLogStore->syncLogGetEntry(pSyncNode->pLogStore, index, &pEntry);
if (code != 0) {
if (pEntry != NULL) {
syncEntryDestory(pEntry);
}
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return -1;
}
ASSERT(pEntry != NULL);
pSnapshot->data = NULL;
pSnapshot->lastApplyIndex = index;
pSnapshot->lastApplyTerm = pEntry->term;
pSnapshot->lastConfigIndex = syncNodeGetSnapshotConfigIndex(pSyncNode, index);
syncEntryDestory(pEntry);
taosReleaseRef(tsNodeRefId, pSyncNode->rid);
return 0;
}
int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) { int32_t syncGetSnapshotMeta(int64_t rid, struct SSnapshotMeta* sMeta) {
SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid); SSyncNode* pSyncNode = (SSyncNode*)taosAcquireRef(tsNodeRefId, rid);
if (pSyncNode == NULL) { if (pSyncNode == NULL) {
@ -786,6 +818,7 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex); int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex);
if (code == 0) { if (code == 0) {
pMsg->info.conn.applyIndex = retIndex; pMsg->info.conn.applyIndex = retIndex;
pMsg->info.conn.applyTerm = pSyncNode->pRaftStore->currentTerm;
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum); syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
ret = 1; ret = 1;
@ -846,6 +879,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
meta.isStandBy = pSyncInfo->isStandBy; meta.isStandBy = pSyncInfo->isStandBy;
meta.snapshotStrategy = pSyncInfo->snapshotStrategy; meta.snapshotStrategy = pSyncInfo->snapshotStrategy;
meta.lastConfigIndex = SYNC_INDEX_INVALID; meta.lastConfigIndex = SYNC_INDEX_INVALID;
meta.batchSize = pSyncInfo->batchSize;
ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath); ret = raftCfgCreateFile((SSyncCfg*)&(pSyncInfo->syncCfg), meta, pSyncNode->configPath);
ASSERT(ret == 0); ASSERT(ret == 0);

View File

@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg); cJSON *pJson = syncCfg2Json(pSyncCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) { if (pSyncCfg != NULL) {
int32_t len = 512; int32_t len = 512;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
memset(s, 0, len); memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
@ -183,6 +183,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg))); cJSON_AddItemToObject(pRoot, "SSyncCfg", syncCfg2Json(&(pRaftCfg->cfg)));
cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy); cJSON_AddNumberToObject(pRoot, "isStandBy", pRaftCfg->isStandBy);
cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy); cJSON_AddNumberToObject(pRoot, "snapshotStrategy", pRaftCfg->snapshotStrategy);
cJSON_AddNumberToObject(pRoot, "batchSize", pRaftCfg->batchSize);
char buf64[128]; char buf64[128];
snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex); snprintf(buf64, sizeof(buf64), "%ld", pRaftCfg->lastConfigIndex);
@ -205,7 +206,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg); cJSON *pJson = raftCfg2Json(pRaftCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -228,6 +229,7 @@ int32_t raftCfgCreateFile(SSyncCfg *pCfg, SRaftCfgMeta meta, const char *path) {
SRaftCfg raftCfg; SRaftCfg raftCfg;
raftCfg.cfg = *pCfg; raftCfg.cfg = *pCfg;
raftCfg.isStandBy = meta.isStandBy; raftCfg.isStandBy = meta.isStandBy;
raftCfg.batchSize = meta.batchSize;
raftCfg.snapshotStrategy = meta.snapshotStrategy; raftCfg.snapshotStrategy = meta.snapshotStrategy;
raftCfg.lastConfigIndex = meta.lastConfigIndex; raftCfg.lastConfigIndex = meta.lastConfigIndex;
raftCfg.configIndexCount = 1; raftCfg.configIndexCount = 1;
@ -257,6 +259,9 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy"); cJSON *pJsonIsStandBy = cJSON_GetObjectItem(pJson, "isStandBy");
pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy); pRaftCfg->isStandBy = cJSON_GetNumberValue(pJsonIsStandBy);
cJSON *pJsonBatchSize = cJSON_GetObjectItem(pJson, "batchSize");
pRaftCfg->batchSize = cJSON_GetNumberValue(pJsonBatchSize);
cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy"); cJSON *pJsonSnapshotStrategy = cJSON_GetObjectItem(pJson, "snapshotStrategy");
pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy); pRaftCfg->snapshotStrategy = cJSON_GetNumberValue(pJsonSnapshotStrategy);
@ -280,7 +285,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
} }
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);

View File

@ -127,7 +127,7 @@ void syncRespCleanByTTL(SSyncRespMgr *pObj, int64_t ttl) {
while (pStub) { while (pStub) {
size_t len; size_t len;
void *key = taosHashGetKey(pStub, &len); void * key = taosHashGetKey(pStub, &len);
SyncIndex *pIndex = (SyncIndex *)key; SyncIndex *pIndex = (SyncIndex *)key;
int64_t nowMS = taosGetTimestampMs(); int64_t nowMS = taosGetTimestampMs();

View File

@ -374,14 +374,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64]; char host[64];
@ -644,7 +644,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId; cJSON * pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -677,14 +677,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId; SRaftId fromId = pReceiver->fromId;
char host[128]; char host[128];

View File

@ -54,15 +54,15 @@ void test1() {
SyncAppendEntriesBatch *pMsg = createMsg(); SyncAppendEntriesBatch *pMsg = createMsg();
syncAppendEntriesBatchLog2((char *)"==test1==", pMsg); syncAppendEntriesBatchLog2((char *)"==test1==", pMsg);
/* /*
SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg); SOffsetAndContLen *metaArr = syncAppendEntriesBatchMetaTableArray(pMsg);
int32_t retArrSize = pMsg->dataCount; int32_t retArrSize = pMsg->dataCount;
for (int i = 0; i < retArrSize; ++i) { for (int i = 0; i < retArrSize; ++i) {
SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset); SSyncRaftEntry *pEntry = (SSyncRaftEntry*)(pMsg->data + metaArr[i].offset);
ASSERT(pEntry->bytes == metaArr[i].contLen); ASSERT(pEntry->bytes == metaArr[i].contLen);
syncEntryPrint(pEntry); syncEntryPrint(pEntry);
} }
*/ */
syncAppendEntriesBatchDestroy(pMsg); syncAppendEntriesBatchDestroy(pMsg);
} }

View File

@ -114,7 +114,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0; return 0;
} }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF; *ppWriter = (void*)0xCDEF;
char logBuf[256] = {0}; char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter); snprintf(logBuf, sizeof(logBuf), "==callback== ==SnapshotStartWrite== pFsm:%p, *ppWriter:%p", pFsm, *ppWriter);

View File

@ -8,11 +8,10 @@ void print(SHashObj *pNextIndex) {
printf("----------------\n"); printf("----------------\n");
uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL); uint64_t *p = (uint64_t *)taosHashIterate(pNextIndex, NULL);
while (p) { while (p) {
size_t len; size_t len;
void* key = taosHashGetKey(p, &len); void * key = taosHashGetKey(p, &len);
SRaftId *pRaftId = (SRaftId*)key; SRaftId *pRaftId = (SRaftId *)key;
printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p); printf("key:<%lu, %d>, value:%lu \n", pRaftId->addr, pRaftId->vgId, *p);
p = (uint64_t *)taosHashIterate(pNextIndex, p); p = (uint64_t *)taosHashIterate(pNextIndex, p);

View File

@ -26,6 +26,7 @@ SRaftCfg* createRaftCfg() {
snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i);
} }
pCfg->isStandBy = taosGetTimestampSec() % 100; pCfg->isStandBy = taosGetTimestampSec() % 100;
pCfg->batchSize = taosGetTimestampSec() % 100;
pCfg->configIndexCount = 5; pCfg->configIndexCount = 5;
for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) {
@ -84,6 +85,7 @@ void test3() {
SRaftCfgMeta meta; SRaftCfgMeta meta;
meta.isStandBy = 7; meta.isStandBy = 7;
meta.snapshotStrategy = 9; meta.snapshotStrategy = 9;
meta.batchSize = 10;
meta.lastConfigIndex = 789; meta.lastConfigIndex = 789;
raftCfgCreateFile(pCfg, meta, s); raftCfgCreateFile(pCfg, meta, s);
printf("%s create json file: %s \n", (char*)__FUNCTION__, s); printf("%s create json file: %s \n", (char*)__FUNCTION__, s);
@ -109,6 +111,7 @@ void test5() {
pCfg->cfg.myIndex = taosGetTimestampSec(); pCfg->cfg.myIndex = taosGetTimestampSec();
pCfg->isStandBy += 2; pCfg->isStandBy += 2;
pCfg->snapshotStrategy += 3; pCfg->snapshotStrategy += 3;
pCfg->batchSize += 4;
pCfg->lastConfigIndex += 1000; pCfg->lastConfigIndex += 1000;
pCfg->configIndexCount = 5; pCfg->configIndexCount = 5;

View File

@ -74,7 +74,7 @@ void syncRespMgrGetAndDelTest(uint64_t i) {
} }
SSyncNode *createSyncNode() { SSyncNode *createSyncNode() {
SSyncNode *pSyncNode = (SSyncNode*)taosMemoryMalloc(sizeof(SSyncNode)); SSyncNode *pSyncNode = (SSyncNode *)taosMemoryMalloc(sizeof(SSyncNode));
memset(pSyncNode, 0, sizeof(SSyncNode)); memset(pSyncNode, 0, sizeof(SSyncNode));
return pSyncNode; return pSyncNode;
} }

View File

@ -29,7 +29,7 @@ int32_t SnapshotStartRead(struct SSyncFSM* pFsm, void** ppReader) { return 0; }
int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; } int32_t SnapshotStopRead(struct SSyncFSM* pFsm, void* pReader) { return 0; }
int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; } int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32_t* len) { return 0; }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { return 0; } int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) { return 0; }
int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; } int32_t SnapshotStopWrite(struct SSyncFSM* pFsm, void* pWriter, bool isApply) { return 0; }
int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; } int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_t len) { return 0; }

View File

@ -111,7 +111,7 @@ int32_t SnapshotDoRead(struct SSyncFSM* pFsm, void* pReader, void** ppBuf, int32
return 0; return 0;
} }
int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void *pParam, void** ppWriter) { int32_t SnapshotStartWrite(struct SSyncFSM* pFsm, void* pParam, void** ppWriter) {
*ppWriter = (void*)0xCDEF; *ppWriter = (void*)0xCDEF;
char logBuf[256] = {0}; char logBuf[256] = {0};
@ -314,18 +314,18 @@ int main(int argc, char** argv) {
exit(-1); exit(-1);
} }
int32_t replicaNum = atoi(argv[1]); int32_t replicaNum = atoi(argv[1]);
int32_t myIndex = atoi(argv[2]); int32_t myIndex = atoi(argv[2]);
ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]); ESyncStrategy enableSnapshot = (ESyncStrategy)atoi(argv[3]);
int32_t lastApplyIndex = atoi(argv[4]); int32_t lastApplyIndex = atoi(argv[4]);
int32_t lastApplyTerm = atoi(argv[5]); int32_t lastApplyTerm = atoi(argv[5]);
int32_t writeRecordNum = atoi(argv[6]); int32_t writeRecordNum = atoi(argv[6]);
bool isStandBy = atoi(argv[7]); bool isStandBy = atoi(argv[7]);
int32_t isConfigChange = atoi(argv[8]); int32_t isConfigChange = atoi(argv[8]);
int32_t iterTimes = atoi(argv[9]); int32_t iterTimes = atoi(argv[9]);
int32_t finishLastApplyIndex = atoi(argv[10]); int32_t finishLastApplyIndex = atoi(argv[10]);
int32_t finishLastApplyTerm = atoi(argv[11]); int32_t finishLastApplyTerm = atoi(argv[11]);
int32_t leaderTransfer = atoi(argv[12]); int32_t leaderTransfer = atoi(argv[12]);
sInfo( sInfo(
"args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, " "args: replicaNum:%d, myIndex:%d, enableSnapshot:%d, lastApplyIndex:%d, lastApplyTerm:%d, writeRecordNum:%d, "