diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index b2c743831a..48ce6c3d10 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -698,6 +698,13 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg); int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg); int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg); +int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); +int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg); +int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg); + // ----------------------------------------- typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg); typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index e89d52c99c..746e0959a8 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -385,6 +385,84 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { vGTrace("vgId:%d, sync msg:%p will be processed, type:%s", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType)); + + if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { + SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnTimeoutCb(pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_PING) { + SyncPing *pSyncMsg = syncPingFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingCb(pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_PING_REPLY) { + SyncPingReply *pSyncMsg = syncPingReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnPingReplyCb(pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestCb(pSyncNode, pSyncMsg, NULL); + syncClientRequestDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_CLIENT_REQUEST_BATCH) { + SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg); + syncClientRequestBatchDestroyDeep(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) { + SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVote(pSyncNode, pSyncMsg); + syncRequestVoteDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply *pSyncMsg = syncRequestVoteReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnRequestVoteReply(pSyncNode, pSyncMsg); + syncRequestVoteReplyDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES) { + SyncAppendEntries *pSyncMsg = syncAppendEntriesFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntries(pSyncNode, pSyncMsg); + syncAppendEntriesDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply *pSyncMsg = syncAppendEntriesReplyFromRpcMsg2(pMsg); + ASSERT(pSyncMsg != NULL); + code = syncNodeOnAppendEntriesReply(pSyncNode, pSyncMsg); + syncAppendEntriesReplyDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_SEND) { + SyncSnapshotSend *pSyncMsg = syncSnapshotSendFromRpcMsg2(pMsg); + code = syncNodeOnSnapshot(pSyncNode, pSyncMsg); + syncSnapshotSendDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_SNAPSHOT_RSP) { + SyncSnapshotRsp *pSyncMsg = syncSnapshotRspFromRpcMsg2(pMsg); + code = syncNodeOnSnapshotReply(pSyncNode, pSyncMsg); + syncSnapshotRspDestroy(pSyncMsg); + + } else if (pMsg->msgType == TDMT_SYNC_SET_VNODE_STANDBY) { + code = vnodeSetStandBy(pVnode); + if (code != 0 && terrno != 0) code = terrno; + SRpcMsg rsp = {.code = code, .info = pMsg->info}; + tmsgSendRsp(&rsp); + + } else { + vGError("vgId:%d, msg:%p failed to process since error msg type:%d", pVnode->config.vgId, pMsg, pMsg->msgType); + code = -1; + } + + +#if 0 if (syncNodeStrategy(pSyncNode) == SYNC_STRATEGY_NO_SNAPSHOT) { if (pMsg->msgType == TDMT_SYNC_TIMEOUT) { SyncTimeout *pSyncMsg = syncTimeoutFromRpcMsg2(pMsg); @@ -506,6 +584,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { code = -1; } } +#endif vTrace("vgId:%d, sync msg:%p is processed, type:%s code:0x%x", pVnode->config.vgId, pMsg, TMSG_INFO(pMsg->msgType), code); diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h index e15c85d73b..87bb1c7b05 100644 --- a/source/libs/sync/inc/syncAppendEntries.h +++ b/source/libs/sync/inc/syncAppendEntries.h @@ -96,6 +96,8 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatch* pMsg); +int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h index 03148252fb..c3316aec49 100644 --- a/source/libs/sync/inc/syncAppendEntriesReply.h +++ b/source/libs/sync/inc/syncAppendEntriesReply.h @@ -44,6 +44,8 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplySnapshot2Cb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e6f802f986..7b34718ad9 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -76,7 +76,7 @@ typedef struct SSyncTimer { uint64_t counter; int32_t timerMS; SRaftId destId; - void *pData; + void* pData; } SSyncTimer; int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId destId); diff --git a/source/libs/sync/inc/syncRequestVote.h b/source/libs/sync/inc/syncRequestVote.h index 3fe8dc0237..fc32e72419 100644 --- a/source/libs/sync/inc/syncRequestVote.h +++ b/source/libs/sync/inc/syncRequestVote.h @@ -52,6 +52,8 @@ extern "C" { int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg); +int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncRequestVoteReply.h b/source/libs/sync/inc/syncRequestVoteReply.h index ac47a8d026..6ec7524149 100644 --- a/source/libs/sync/inc/syncRequestVoteReply.h +++ b/source/libs/sync/inc/syncRequestVoteReply.h @@ -47,6 +47,8 @@ extern "C" { int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncSnapshot.h b/source/libs/sync/inc/syncSnapshot.h index 1ec9e9f2f1..9ae260a308 100644 --- a/source/libs/sync/inc/syncSnapshot.h +++ b/source/libs/sync/inc/syncSnapshot.h @@ -97,6 +97,9 @@ char * snapshotReceiver2SimpleStr(SSyncSnapshotReceiver *pReceiver, char *event) int32_t syncNodeOnSnapshotSendCb(SSyncNode *ths, SyncSnapshotSend *pMsg); int32_t syncNodeOnSnapshotRspCb(SSyncNode *ths, SyncSnapshotRsp *pMsg); +int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg); +int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index e000ba8bf8..b8bb6e5da7 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -1041,3 +1041,5 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs return ret; } + +int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg) { return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c index 9253ed0129..97d8fdd626 100644 --- a/source/libs/sync/src/syncAppendEntriesReply.c +++ b/source/libs/sync/src/syncAppendEntriesReply.c @@ -413,4 +413,6 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries } while (0); return 0; -} \ No newline at end of file +} + +int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg) { return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 00b1ceb376..b7a8e35cf3 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -932,13 +932,12 @@ int32_t syncHbTimerInit(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer, SRaftId de int32_t syncHbTimerStart(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { int32_t ret = 0; if (syncEnvIsStart()) { - - SSyncHbTimerData *pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); + SSyncHbTimerData* pData = taosMemoryMalloc(sizeof(SSyncHbTimerData)); pData->pSyncNode = pSyncNode; pData->pTimer = pSyncTimer; pData->destId = pSyncTimer->destId; pData->logicClock = pSyncTimer->logicClock; - + pSyncTimer->pData = pData; taosTmrReset(pSyncTimer->timerCb, pSyncTimer->timerMS, pData, gSyncEnv->pTimerManager, &pSyncTimer->pTimer); } else { @@ -952,7 +951,7 @@ int32_t syncHbTimerStop(SSyncNode* pSyncNode, SSyncTimer* pSyncTimer) { atomic_add_fetch_64(&pSyncTimer->logicClock, 1); taosTmrStop(pSyncTimer->pTimer); pSyncTimer->pTimer = NULL; - //taosMemoryFree(pSyncTimer->pData); + // taosMemoryFree(pSyncTimer->pData); return ret; } diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c index 122a81930b..74c4babf18 100644 --- a/source/libs/sync/src/syncRequestVote.c +++ b/source/libs/sync/src/syncRequestVote.c @@ -213,4 +213,6 @@ int32_t syncNodeOnRequestVoteSnapshotCb(SSyncNode* ths, SyncRequestVote* pMsg) { syncRequestVoteReplyDestroy(pReply); return 0; -} \ No newline at end of file +} + +int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg) { return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c index 55553d5048..84b3df0bf5 100644 --- a/source/libs/sync/src/syncRequestVoteReply.c +++ b/source/libs/sync/src/syncRequestVoteReply.c @@ -155,4 +155,6 @@ int32_t syncNodeOnRequestVoteReplySnapshotCb(SSyncNode* ths, SyncRequestVoteRepl } return 0; -} \ No newline at end of file +} + +int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg) { return 0; } \ No newline at end of file diff --git a/source/libs/sync/src/syncSnapshot.c b/source/libs/sync/src/syncSnapshot.c index 39d10462cc..2014449faf 100644 --- a/source/libs/sync/src/syncSnapshot.c +++ b/source/libs/sync/src/syncSnapshot.c @@ -901,3 +901,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode *pSyncNode, SyncSnapshotRsp *pMsg) { return 0; } + +int32_t syncNodeOnSnapshot(SSyncNode *ths, SyncSnapshotSend *pMsg) { return 0; } + +int32_t syncNodeOnSnapshotReply(SSyncNode *ths, SyncSnapshotRsp *pMsg) { return 0; } \ No newline at end of file