diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 019c291efc..85a82dcfb7 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -39,7 +39,6 @@ extern "C" { // /\ UNCHANGED <> // int32_t syncNodeRequestVotePeers(SSyncNode* pSyncNode); - int32_t syncNodeElect(SSyncNode* pSyncNode); int32_t syncNodeRequestVote(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncRequestVote* pMsg); diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 15c719b76e..5a9af83827 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -70,6 +70,9 @@ extern "C" { struct SyncTimeout; typedef struct SyncTimeout SyncTimeout; +struct SyncClientRequest; +typedef struct SyncClientRequest SyncClientRequest; + struct SyncPing; typedef struct SyncPing SyncPing; @@ -185,6 +188,7 @@ typedef struct SSyncNode { // callback int32_t (*FpOnPing)(SSyncNode* ths, SyncPing* pMsg); int32_t (*FpOnPingReply)(SSyncNode* ths, SyncPingReply* pMsg); + int32_t (*FpOnClientRequest)(SSyncNode* ths, SyncClientRequest* pMsg); int32_t (*FpOnRequestVote)(SSyncNode* ths, SyncRequestVote* pMsg); int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index aca6205b9d..6fe18dae38 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -52,7 +52,6 @@ extern "C" { // /\ UNCHANGED <> // int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); - int32_t syncNodeReplicate(SSyncNode* pSyncNode); int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, const SyncAppendEntries* pMsg); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index aaf6535f40..dd2c142104 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -42,6 +42,7 @@ static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); // on message ---- static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); +static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg); // --------------------------------- int32_t syncInit() { @@ -192,6 +193,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { // init callback pSyncNode->FpOnPing = syncNodeOnPingCb; pSyncNode->FpOnPingReply = syncNodeOnPingReplyCb; + pSyncNode->FpOnClientRequest = syncNodeOnClientRequestCb; pSyncNode->FpOnRequestVote = syncNodeOnRequestVoteCb; pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; @@ -696,3 +698,19 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { syncPingReplyLog2("==syncNodeOnPingReplyCb==", pMsg); return ret; } + +static int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) { + int32_t ret = 0; + syncClientRequestLog2("==syncNodeOnClientRequestCb==", pMsg); + + if (ths->state == TAOS_SYNC_STATE_LEADER) { + SSyncRaftEntry* pEntry = syncEntryDeserialize(pMsg->data, pMsg->dataLen); + ths->pLogStore->appendEntry(ths->pLogStore, pEntry); + syncNodeReplicate(ths); + syncEntryDestory(pEntry); + } else { + // ths->pFsm->FpCommitCb(-1) + } + + return ret; +} \ No newline at end of file