sync append entries (sync-io)
This commit is contained in:
parent
09e2f36ddd
commit
a69330b68d
|
@ -34,11 +34,11 @@ extern "C" {
|
|||
|
||||
typedef struct SSyncIO {
|
||||
STaosQueue *pMsgQ;
|
||||
STaosQset * pQset;
|
||||
STaosQset *pQset;
|
||||
pthread_t consumerTid;
|
||||
|
||||
void * serverRpc;
|
||||
void * clientRpc;
|
||||
void *serverRpc;
|
||||
void *clientRpc;
|
||||
SEpSet myAddr;
|
||||
|
||||
tmr_h qTimer;
|
||||
|
@ -50,6 +50,7 @@ typedef struct SSyncIO {
|
|||
void *pSyncNode;
|
||||
int32_t (*FpOnSyncPing)(SSyncNode *pSyncNode, SyncPing *pMsg);
|
||||
int32_t (*FpOnSyncPingReply)(SSyncNode *pSyncNode, SyncPingReply *pMsg);
|
||||
int32_t (*FpOnSyncClientRequest)(SSyncNode *pSyncNode, SyncClientRequest *pMsg);
|
||||
int32_t (*FpOnSyncRequestVote)(SSyncNode *pSyncNode, SyncRequestVote *pMsg);
|
||||
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
|
||||
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
|
||||
|
|
|
@ -29,7 +29,7 @@ static int32_t syncIODestroy(SSyncIO *io);
|
|||
static int32_t syncIOStartInternal(SSyncIO *io);
|
||||
static int32_t syncIOStopInternal(SSyncIO *io);
|
||||
|
||||
static void * syncIOConsumerFunc(void *param);
|
||||
static void *syncIOConsumerFunc(void *param);
|
||||
static void syncIOProcessRequest(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static void syncIOProcessReply(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet);
|
||||
static int32_t syncIOAuth(void *parent, char *meterId, char *spi, char *encrypt, char *secret, char *ckey);
|
||||
|
@ -234,9 +234,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
|
|||
}
|
||||
|
||||
static void *syncIOConsumerFunc(void *param) {
|
||||
SSyncIO * io = param;
|
||||
SSyncIO *io = param;
|
||||
STaosQall *qall;
|
||||
SRpcMsg * pRpcMsg, rpcMsg;
|
||||
SRpcMsg *pRpcMsg, rpcMsg;
|
||||
qall = taosAllocateQall();
|
||||
|
||||
while (1) {
|
||||
|
@ -273,6 +273,13 @@ static void *syncIOConsumerFunc(void *param) {
|
|||
syncPingReplyDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
|
||||
if (io->FpOnSyncClientRequest != NULL) {
|
||||
SyncClientRequest *pSyncMsg = syncClientRequestFromRpcMsg2(pRpcMsg);
|
||||
io->FpOnSyncClientRequest(io->pSyncNode, pSyncMsg);
|
||||
syncClientRequestDestroy(pSyncMsg);
|
||||
}
|
||||
|
||||
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
|
||||
if (io->FpOnSyncRequestVote != NULL) {
|
||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pRpcMsg);
|
||||
|
|
Loading…
Reference in New Issue