refactor(sync): delete some code

This commit is contained in:
Minghao Li 2022-10-17 17:36:51 +08:00
parent 94d8c09e58
commit 8a6eaed6f6
4 changed files with 36 additions and 35 deletions

View File

@ -541,6 +541,16 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) { } else if (pMsg->msgType == TDMT_SYNC_SET_MNODE_STANDBY) {
code = syncSetStandby(pMgmt->sync); code = syncSetStandby(pMgmt->sync);
SRpcMsg rsp = {.code = code, .info = pMsg->info}; SRpcMsg rsp = {.code = code, .info = pMsg->info};

View File

@ -385,7 +385,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType));
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
@ -410,12 +409,6 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL); code = syncNodeOnClientRequest(pSyncNode, pSyncMsg, NULL);
syncClientRequestDestroy(pSyncMsg); syncClientRequestDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) {
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
ASSERT(pSyncMsg != NULL);
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
syncClientRequestBatchDestroyDeep(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
ASSERT(pSyncMsg != NULL); ASSERT(pSyncMsg != NULL);
@ -440,15 +433,15 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg);
syncAppendEntriesReplyDestroy(pSyncMsg); syncAppendEntriesReplyDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) {
SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg);
code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); code = syncNodeOnSnapshot(pSyncNode, pSyncMsg);
syncSnapshotSendDestroy(pSyncMsg); syncSnapshotSendDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) {
SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg);
code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg);
syncSnapshotRspDestroy(pSyncMsg); syncSnapshotRspDestroy(pSyncMsg);
} else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) {
code = vnodeSetStandBy(pVnode); code = vnodeSetStandBy(pVnode);
@ -457,11 +450,10 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
tmsgSendRsp(&rsp); tmsgSendRsp(&rsp);
} else { } else {
vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType);
code = -1; code = -1;
} }
#if 0 #if 0
if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) {
if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) {
@ -935,4 +927,3 @@ bool vnodeIsLeader(SVnode *pVnode) {
return true; return true;
} }

View File

@ -40,14 +40,14 @@ typedef struct SSyncSnapshotSender {
bool start; bool start;
int32_t seq; int32_t seq;
int32_t ack; int32_t ack;
void *pReader; void * pReader;
void *pCurrentBlock; void * pCurrentBlock;
int32_t blockLen; int32_t blockLen;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SSyncCfg lastConfig; SSyncCfg lastConfig;
int64_t sendingMS; int64_t sendingMS;
SSyncNode *pSyncNode; SSyncNode * pSyncNode;
int32_t replicaIndex; int32_t replicaIndex;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
@ -65,8 +65,8 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t snapshotReSend(SSyncSnapshotSender *pSender); int32_t snapshotReSend(SSyncSnapshotSender *pSender);
cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender); cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender);
char *snapshotSender2Str(SSyncSnapshotSender *pSender); char * snapshotSender2Str(SSyncSnapshotSender *pSender);
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event); char * snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event);
int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId); int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
@ -74,13 +74,13 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId);
typedef struct SSyncSnapshotReceiver { typedef struct SSyncSnapshotReceiver {
bool start; bool start;
int32_t ack; int32_t ack;
void *pWriter; void * pWriter;
SyncTerm term; SyncTerm term;
SyncTerm privateTerm; SyncTerm privateTerm;
SSnapshotParam snapshotParam; SSnapshotParam snapshotParam;
SSnapshot snapshot; SSnapshot snapshot;
SRaftId fromId; SRaftId fromId;
SSyncNode *pSyncNode; SSyncNode * pSyncNode;
} SSyncSnapshotReceiver; } SSyncSnapshotReceiver;
@ -92,8 +92,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver)
void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver); void snapshotReceiverForceStop(SSyncSnapshotReceiver *pReceiver);
cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver); cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver); char * snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver);
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event); char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event);
//--------------------------------------------------- //---------------------------------------------------
// on message // on message

View File

@ -379,14 +379,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char *snapshotSender2Str(SSyncSnapshotSender *pSender) { char *snapshotSender2Str(SSyncSnapshotSender *pSender) {
cJSON *pJson = snapshotSender2Json(pSender); cJSON *pJson = snapshotSender2Json(pSender);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) { char *snapshotSender2SimpleStr(SSyncSnapshotSender *pSender, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex]; SRaftId destId = pSender->pSyncNode->replicasId[pSender->replicaIndex];
char host[64]; char host[64];
@ -674,7 +674,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject(pFromId, "addr", u64buf); cJSON_AddStringToObject(pFromId, "addr", u64buf);
{ {
uint64_t u64 = pReceiver->fromId.addr; uint64_t u64 = pReceiver->fromId.addr;
cJSON *pTmp = pFromId; cJSON * pTmp = pFromId;
char host[128] = {0}; char host[128] = {0};
uint16_t port; uint16_t port;
syncUtilU642Addr(u64, host, sizeof(host), &port); syncUtilU642Addr(u64, host, sizeof(host), &port);
@ -707,14 +707,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) { char *snapshotReceiver2Str(SSyncSnapshotReceiver *pReceiver) {
cJSON *pJson = snapshotReceiver2Json(pReceiver); cJSON *pJson = snapshotReceiver2Json(pReceiver);
char *serialized = cJSON_Print(pJson); char * serialized = cJSON_Print(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);
return serialized; return serialized;
} }
char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) { char *snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) {
int32_t len = 256; int32_t len = 256;
char *s = taosMemoryMalloc(len); char * s = taosMemoryMalloc(len);
SRaftId fromId = pReceiver->fromId; SRaftId fromId = pReceiver->fromId;
char host[128]; char host[128];