Merge pull request #14272 from taosdata/feature/3.0_mhli

refactor(sync): optimized only one replica
This commit is contained in:
Li Minghao 2022-06-27 11:05:39 +08:00 committed by GitHub
commit 486f268353
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 173 additions and 106 deletions

View File

@ -26,9 +26,9 @@ extern "C" {
extern bool gRaftDetailLog; extern bool gRaftDetailLog;
#define SYNC_INDEX_BEGIN 0 #define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1 #define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF #define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
typedef uint64_t SyncNodeId; typedef uint64_t SyncNodeId;
typedef int32_t SyncGroupId; typedef int32_t SyncGroupId;

View File

@ -524,7 +524,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
@ -541,7 +541,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
// ----------------------------------------- // -----------------------------------------
typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SyncClientRequest* pMsg); typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg); typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg); typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg); typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);

View File

@ -442,7 +442,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
@ -491,7 +491,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
syncPingReplyDestroy(pSyncMsg); syncPingReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg);
code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);

View File

@ -137,6 +137,25 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr()); vError("vgId:%d, failed to pre-process msg:%p since %s", vgId, pMsg, terrstr());
} else { } else {
code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType)); code = syncPropose(pVnode->sync, pMsg, vnodeIsMsgWeak(pMsg->msgType));
if (code == 1) {
do {
static int32_t cnt = 0;
if (cnt++ % 1000 == 1) {
vInfo("vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d", vgId, pMsg,
pMsg->info.conn.applyIndex, TMSG_INFO(pMsg->msgType), pMsg->msgType);
}
} while (0);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (vnodeProcessWriteReq(pVnode, pMsg, pMsg->info.conn.applyIndex, &rsp) < 0) {
rsp.code = terrno;
vInfo("vgId:%d, msg:%p failed to apply right now since %s", vgId, pMsg, terrstr());
}
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
}
} }
} }
@ -163,10 +182,12 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info}; SRpcMsg rsp = {.code = TSDB_CODE_RPC_REDIRECT, .info = pMsg->info};
tmsgSendRedirectRsp(&rsp, &newEpSet); tmsgSendRedirectRsp(&rsp, &newEpSet);
} else { } else {
if (terrno != 0) code = terrno; if (code != 1) {
vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code); if (terrno != 0) code = terrno;
SRpcMsg rsp = {.code = code, .info = pMsg->info}; vError("vgId:%d, msg:%p failed to propose since %s, code:0x%x", vgId, pMsg, tstrerror(code), code);
tmsgSendRsp(&rsp); SRpcMsg rsp = {.code = code, .info = pMsg->info};
tmsgSendRsp(&rsp);
}
} }
vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code); vGTrace("vgId:%d, msg:%p is freed, code:0x%x", vgId, pMsg, code);
@ -260,7 +281,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
assert(pSyncMsg != NULL); assert(pSyncMsg != NULL);
ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg); ret = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pRpcMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
@ -359,34 +380,18 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
SyncIndex beginIndex = SYNC_INDEX_INVALID; SyncIndex beginIndex = SYNC_INDEX_INVALID;
char logBuf[256] = {0}; char logBuf[256] = {0};
if (pFsm->FpGetSnapshotInfo != NULL) { snprintf(logBuf, sizeof(logBuf),
(*pFsm->FpGetSnapshotInfo)(pFsm, &snapshot); "==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n",
beginIndex = snapshot.lastApplyIndex; pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
} beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
if (cbMeta.index > beginIndex) { SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
snprintf( rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
logBuf, sizeof(logBuf), memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld\n", syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); rpcMsg.info.conn.applyIndex = cbMeta.index;
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen);
memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen);
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
char logBuf[256] = {0};
snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
"beginIndex :%ld\n",
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state),
beginIndex);
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
}
} }
static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {

View File

@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h" #include "trpc.h"
#include "ttimer.h" #include "ttimer.h"
#define TIMER_MAX_MS 0x7FFFFFFF #define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000 #define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000 #define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 1300 #define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2) #define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN) #define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS 900 #define HEARTBEAT_TIMER_MS 900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0}) #define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})

View File

@ -50,7 +50,7 @@ typedef struct SSyncIO {
void *pSyncNode; void *pSyncNode;
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg); int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg);
int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg); int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg, SyncIndex *pRetIndex);
int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg); int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg);
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);

View File

@ -169,7 +169,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void syncNodeStart(SSyncNode* pSyncNode); void syncNodeStart(SSyncNode* pSyncNode);
void syncNodeStartStandBy(SSyncNode* pSyncNode); void syncNodeStartStandBy(SSyncNode* pSyncNode);
void syncNodeClose(SSyncNode* pSyncNode); void syncNodeClose(SSyncNode* pSyncNode);
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak); int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak);
// option // option
bool syncNodeSnapshotEnable(SSyncNode* pSyncNode); bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
@ -233,6 +233,7 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index); SyncTerm syncNodeGetPreTerm(SSyncNode* pSyncNode, SyncIndex index);
int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm); int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncIndex* pPreIndex, SyncTerm* pPreTerm);
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg);
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag);
int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg);

View File

@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg);
int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex);
cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); cJSON * syncCfg2Json(SSyncCfg *pSyncCfg);
char *syncCfg2Str(SSyncCfg *pSyncCfg); char * syncCfg2Str(SSyncCfg *pSyncCfg);
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg);
int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg);
int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg);
cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); cJSON * raftCfg2Json(SRaftCfg *pRaftCfg);
char *raftCfg2Str(SRaftCfg *pRaftCfg); char * raftCfg2Str(SRaftCfg *pRaftCfg);
int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg);
int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg);

View File

@ -29,9 +29,12 @@ extern "C" {
#include "wal.h" #include "wal.h"
typedef struct SSyncLogStoreData { typedef struct SSyncLogStoreData {
SSyncNode* pSyncNode; SSyncNode* pSyncNode;
SWal* pWal; SWal* pWal;
TdThreadMutex mutex;
SWalReadHandle* pWalHandle; SWalReadHandle* pWalHandle;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0 // SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
} SSyncLogStoreData; } SSyncLogStoreData;

View File

@ -102,6 +102,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
} }
} }
// maybe execute fsm
if (newCommitIndex > pSyncNode->commitIndex) { if (newCommitIndex > pSyncNode->commitIndex) {
SyncIndex beginIndex = pSyncNode->commitIndex + 1; SyncIndex beginIndex = pSyncNode->commitIndex + 1;
SyncIndex endIndex = newCommitIndex; SyncIndex endIndex = newCommitIndex;

View File

@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io);
static int32_t syncIOStopInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io);
static void *syncIOConsumerFunc(void *param); static void * syncIOConsumerFunc(void *param);
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
} }
static void *syncIOConsumerFunc(void *param) { static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param; SSyncIO * io = param;
STaosQall *qall; STaosQall *qall;
SRpcMsg *pRpcMsg, rpcMsg; SRpcMsg * pRpcMsg, rpcMsg;
qall = taosAllocateQall(); qall = taosAllocateQall();
while (1) { while (1) {
@ -281,7 +281,7 @@ static void *syncIOConsumerFunc(void *param) {
if (io->FpOnSyncClientRequest != NULL) { if (io->FpOnSyncClientRequest != NULL) {
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg); io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} }

View File

@ -126,7 +126,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) { char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr); cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }

View File

@ -50,7 +50,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
// process message ---- // process message ----
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex);
// life cycle // life cycle
static void syncFreeNode(void* param); static void syncFreeNode(void* param);
@ -627,7 +627,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return ret; return ret;
} }
int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak) { int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
int32_t ret = 0; int32_t ret = 0;
char eventLog[128]; char eventLog[128];
@ -664,13 +664,34 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
SRpcMsg rpcMsg; SRpcMsg rpcMsg;
syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg); syncClientRequest2RpcMsg(pSyncMsg, &rpcMsg);
if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) { // optimized one replica
ret = 0; if (syncNodeIsOptimizedOneReplica(pSyncNode, pMsg)) {
SyncIndex retIndex;
int32_t code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, &retIndex);
if (code == 0) {
pMsg->info.conn.applyIndex = retIndex;
rpcFreeCont(rpcMsg.pCont);
syncRespMgrDel(pSyncNode->pSyncRespMgr, seqNum);
ret = 1;
sDebug("vgId:%d optimized index:%ld success, msgtype:%s,%d", pSyncNode->vgId, retIndex,
TMSG_INFO(pMsg->msgType), pMsg->msgType);
} else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("vgId:%d optimized index:%ld error, msgtype:%s,%d", pSyncNode->vgId, retIndex, TMSG_INFO(pMsg->msgType),
pMsg->msgType);
}
} else { } else {
ret = -1; if (pSyncNode->FpEqMsg != NULL && (*pSyncNode->FpEqMsg)(pSyncNode->msgcb, &rpcMsg) == 0) {
terrno = TSDB_CODE_SYN_INTERNAL_ERROR; ret = 0;
sError("syncPropose pSyncNode->FpEqMsg is NULL"); } else {
ret = -1;
terrno = TSDB_CODE_SYN_INTERNAL_ERROR;
sError("enqueue msg error, FpEqMsg is NULL");
}
} }
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
goto _END; goto _END;
@ -2377,7 +2398,7 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
// /\ UNCHANGED <<messages, serverVars, candidateVars, // /\ UNCHANGED <<messages, serverVars, candidateVars,
// leaderVars, commitIndex>> // leaderVars, commitIndex>>
// //
int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncIndex* pRetIndex) {
int32_t ret = 0; int32_t ret = 0;
syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg);
@ -2436,6 +2457,14 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
rpcFreeCont(rpcMsg.pCont); rpcFreeCont(rpcMsg.pCont);
} }
if (pRetIndex != NULL) {
if (ret == 0 && pEntry != NULL) {
*pRetIndex = pEntry->index;
} else {
*pRetIndex = SYNC_INDEX_INVALID;
}
}
syncEntryDestory(pEntry); syncEntryDestory(pEntry);
return ret; return ret;
} }
@ -2600,6 +2629,10 @@ static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFin
return 0; return 0;
} }
bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg) {
return (ths->replicaNum == 1 && syncUtilUserCommit(pMsg->msgType) && ths->vgId != 1);
}
int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) { int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag) {
int32_t code = 0; int32_t code = 0;
ESyncState state = flag; ESyncState state = flag;
@ -2621,19 +2654,33 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
syncEntry2OriginalRpc(pEntry, &rpcMsg); syncEntry2OriginalRpc(pEntry, &rpcMsg);
// user commit // user commit
if (ths->pFsm->FpCommitCb != NULL && syncUtilUserCommit(pEntry->originalRpcType)) { if ((ths->pFsm->FpCommitCb != NULL) && syncUtilUserCommit(pEntry->originalRpcType)) {
SFsmCbMeta cbMeta = {0}; bool internalExecute = true;
cbMeta.index = pEntry->index; if ((ths->replicaNum == 1) && ths->restoreFinish && (ths->vgId != 1)) {
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); internalExecute = false;
cbMeta.isWeak = pEntry->isWeak; }
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = flag;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta); do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "index:%ld, internalExecute:%d", i, internalExecute);
syncNodeEventLog(ths, logBuf);
} while (0);
// execute fsm in apply thread, or execute outside syncPropose
if (internalExecute) {
SFsmCbMeta cbMeta = {0};
cbMeta.index = pEntry->index;
cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index);
cbMeta.isWeak = pEntry->isWeak;
cbMeta.code = 0;
cbMeta.state = ths->state;
cbMeta.seqNum = pEntry->seqNum;
cbMeta.term = pEntry->term;
cbMeta.currentTerm = ths->pRaftStore->currentTerm;
cbMeta.flag = flag;
ths->pFsm->FpCommitCb(ths->pFsm, &rpcMsg, cbMeta);
}
} }
// config change // config change

View File

@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) {
cJSON *pJson = syncCfg2Json(pSyncCfg); cJSON *pJson = syncCfg2Json(pSyncCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) {
if (pSyncCfg != NULL) { if (pSyncCfg != NULL) {
int32_t len = 512; int32_t len = 512;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
memset(s, 0, len); memset(s, 0, len);
snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); snprintf(s, len, "{replica-num:%d, my-index:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex);
@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char *raftCfg2Str(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) {
cJSON *pJson = raftCfg2Json(pRaftCfg); cJSON *pJson = raftCfg2Json(pRaftCfg);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -271,7 +271,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring);
} }
cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg");
int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg));
ASSERT(code == 0); ASSERT(code == 0);

View File

@ -257,6 +257,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return -1; return -1;
} }
taosThreadMutexLock(&(pData->mutex));
code = walReadWithHandle(pWalHandle, index); code = walReadWithHandle(pWalHandle, index);
if (code != 0) { if (code != 0) {
int32_t err = terrno; int32_t err = terrno;
@ -281,6 +283,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
terrno = saveErr; terrno = saveErr;
*/ */
taosThreadMutexUnlock(&(pData->mutex));
return code; return code;
} }
@ -301,6 +304,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
terrno = saveErr; terrno = saveErr;
*/ */
taosThreadMutexUnlock(&(pData->mutex));
return code; return code;
} }
@ -364,6 +368,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pData->pWal = pSyncNode->pWal; pData->pWal = pSyncNode->pWal;
ASSERT(pData->pWal != NULL); ASSERT(pData->pWal != NULL);
taosThreadMutexInit(&(pData->mutex), NULL);
pData->pWalHandle = walOpenReadHandle(pData->pWal); pData->pWalHandle = walOpenReadHandle(pData->pWal);
ASSERT(pData->pWalHandle != NULL); ASSERT(pData->pWalHandle != NULL);
@ -408,9 +413,14 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
void logStoreDestory(SSyncLogStore* pLogStore) { void logStoreDestory(SSyncLogStore* pLogStore) {
if (pLogStore != NULL) { if (pLogStore != NULL) {
SSyncLogStoreData* pData = pLogStore->data; SSyncLogStoreData* pData = pLogStore->data;
taosThreadMutexLock(&(pData->mutex));
if (pData->pWalHandle != NULL) { if (pData->pWalHandle != NULL) {
walCloseReadHandle(pData->pWalHandle); walCloseReadHandle(pData->pWalHandle);
pData->pWalHandle = NULL;
} }
taosThreadMutexUnlock(&(pData->mutex));
taosThreadMutexDestroy(&(pData->mutex));
taosMemoryFree(pLogStore->data); taosMemoryFree(pLogStore->data);
taosMemoryFree(pLogStore); taosMemoryFree(pLogStore);
@ -460,6 +470,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWal* pWal = pData->pWal; SWal* pWal = pData->pWal;
if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) { if (index >= SYNC_INDEX_BEGIN && index <= logStoreLastIndex(pLogStore)) {
taosThreadMutexLock(&(pData->mutex));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal); // SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle* pWalHandle = pData->pWalHandle; SWalReadHandle* pWalHandle = pData->pWalHandle;
ASSERT(pWalHandle != NULL); ASSERT(pWalHandle != NULL);
@ -503,6 +515,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
terrno = saveErr; terrno = saveErr;
*/ */
taosThreadMutexUnlock(&(pData->mutex));
return pEntry; return pEntry;
} else { } else {

View File

@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char *raftStore2Str(SRaftStore *pRaftStore) { char *raftStore2Str(SRaftStore *pRaftStore) {
cJSON *pJson = raftStore2Json(pRaftStore); cJSON *pJson = raftStore2Json(pRaftStore);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }

View File

@ -314,14 +314,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[128]; char host[128];
@ -461,7 +461,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId; cJSON * pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -494,14 +494,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId; SRaftId fromId = pReceiver->fromId;
char host[128]; char host[128];

View File

@ -127,7 +127,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char *voteGranted2Str(SVotesGranted *pVotesGranted) { char *voteGranted2Str(SVotesGranted *pVotesGranted) {
cJSON *pJson = voteGranted2Json(pVotesGranted); cJSON *pJson = voteGranted2Json(pVotesGranted);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
@ -256,7 +256,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char *votesRespond2Str(SVotesRespond *pVotesRespond) { char *votesRespond2Str(SVotesRespond *pVotesRespond) {
cJSON *pJson = votesRespond2Json(pVotesRespond); cJSON *pJson = votesRespond2Json(pVotesRespond);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }

View File

@ -113,7 +113,7 @@ void test2() {
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore; pSyncNode->pLogStore = pLogStore;
//pLogStore->syncLogSetBeginIndex(pLogStore, 5); // pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4); pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest2 ----- ", pLogStore);
@ -229,7 +229,7 @@ void test4() {
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore; pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest4 ----- ", pLogStore);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5); // pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4); pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) { for (int i = 5; i <= 9; ++i) {
@ -291,7 +291,7 @@ void test5() {
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore; pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5); // pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4); pLogStore->syncLogRestoreFromSnapshot(pLogStore, 4);
for (int i = 5; i <= 9; ++i) { for (int i = 5; i <= 9; ++i) {
@ -412,26 +412,23 @@ void test6() {
do { do {
SyncIndex firstVer = walGetFirstVer(pWal); SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal); SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal); bool isEmpty = walIsEmpty(pWal);
printf("before -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty); printf("before -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0); } while (0);
logStoreDestory(pLogStore); logStoreDestory(pLogStore);
cleanup(); cleanup();
// restart // restart
init(); init();
pLogStore = logStoreCreate(pSyncNode); pLogStore = logStoreCreate(pSyncNode);
assert(pLogStore); assert(pLogStore);
pSyncNode->pLogStore = pLogStore; pSyncNode->pLogStore = pLogStore;
do { do {
SyncIndex firstVer = walGetFirstVer(pWal); SyncIndex firstVer = walGetFirstVer(pWal);
SyncIndex lastVer = walGetLastVer(pWal); SyncIndex lastVer = walGetLastVer(pWal);
bool isEmpty = walIsEmpty(pWal); bool isEmpty = walIsEmpty(pWal);
printf("after -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty); printf("after -------- firstVer:%ld lastVer:%ld isEmpty:%d \n", firstVer, lastVer, isEmpty);
} while (0); } while (0);
@ -461,13 +458,13 @@ int main(int argc, char** argv) {
} }
sTrace("gAssert : %d", gAssert); sTrace("gAssert : %d", gAssert);
/* /*
test1(); test1();
test2(); test2();
test3(); test3();
test4(); test4();
test5(); test5();
*/ */
test6(); test6();
return 0; return 0;

View File

@ -312,7 +312,7 @@ void test5() {
pSyncNode->pLogStore = pLogStore; pSyncNode->pLogStore = pLogStore;
logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore); logStoreLog2((char*)"\n\n\ntest5 ----- ", pLogStore);
//pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6); // pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, 5); pLogStore->syncLogRestoreFromSnapshot(pSyncNode->pLogStore, 5);
for (int i = 6; i <= 10; ++i) { for (int i = 6; i <= 10; ++i) {
int32_t dataLen = 10; int32_t dataLen = 10;