From a69330b68d6fd1026cc8da9b0a8199433164d8b4 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 15 Mar 2022 14:07:45 +0800 Subject: [PATCH] sync append entries (sync-io) --- source/libs/sync/inc/syncIO.h | 7 ++++--- source/libs/sync/src/syncIO.c | 13 ++++++++++--- 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 352d30c8d7..09e93fda1c 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -34,11 +34,11 @@ extern "C" { typedef struct SSyncIO { STaosQueue *pMsgQ; - STaosQset * pQset; + STaosQset *pQset; pthread_t consumerTid; - void * serverRpc; - void * clientRpc; + void *serverRpc; + void *clientRpc; SEpSet myAddr; tmr_h qTimer; @@ -50,6 +50,7 @@ typedef struct SSyncIO { void *pSyncNode; int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg); int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg); + int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg); int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg); int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index c307ec5068..8176ac417a 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io); static int32_t syncIOStartInternal(SSyncIO *io); static int32_t syncIOStopInternal(SSyncIO *io); -static void * syncIOConsumerFunc(void *param); +static void *syncIOConsumerFunc(void *param); static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet); static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey); @@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) { } static void *syncIOConsumerFunc(void *param) { - SSyncIO * io = param; + SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; qall = taosAllocateQall(); while (1) { @@ -273,6 +273,13 @@ static void *syncIOConsumerFunc(void *param) { syncPingReplyDestroy(pSyncMsg); } + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { + if (io->FpOnSyncClientRequest != NULL) { + SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg); + io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg); + syncClientRequestDestroy(pSyncMsg); + } + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { if (io->FpOnSyncRequestVote != NULL) { SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);