sync integration add SyncApplyMsg

This commit is contained in:
Minghao Li 2022-04-20 11:51:00 +08:00
parent 704671d2fe
commit 6035f0305d
2 changed files with 60 additions and 1 deletions

View File

@ -363,6 +363,37 @@ void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg); void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg); void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
// ---------------------------------------------
typedef struct SyncApplyMsg {
uint32_t bytes;
int32_t vgId;
uint32_t msgType; // user SyncApplyMsg msgType
uint32_t originalRpcType; // user RpcMsg msgType
SFsmCbMeta fsmMeta;
uint32_t dataLen; // user RpcMsg.contLen
char data[]; // user RpcMsg.pCont
} SyncApplyMsg;
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen);
SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta);
void syncApplyMsgDestroy(SyncApplyMsg* pMsg);
void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen);
void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg);
char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len);
SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len);
void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ
void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg
void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg
SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg);
cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg);
char* syncApplyMsg2Str(const SyncApplyMsg* pMsg);
// for debug ----------------------
void syncApplyMsgPrint(const SyncApplyMsg* pMsg);
void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg);
void ssyncApplyMsgLog(const SyncApplyMsg* pMsg);
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
// on message ---------------------- // on message ----------------------
int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg); int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg); int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg);
@ -373,7 +404,7 @@ int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg)
int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
//--------------------- // ---------------------------------------------
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -1531,3 +1531,31 @@ void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg) {
sTrace("syncAppendEntriesReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized); sTrace("syncAppendEntriesReplyLog2 | len:%lu | %s | %s", strlen(serialized), s, serialized);
taosMemoryFree(serialized); taosMemoryFree(serialized);
} }
// ---- message process SyncApplyMsg----
SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen) { return NULL; }
SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta) { return NULL; }
void syncApplyMsgDestroy(SyncApplyMsg* pMsg) {}
void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen) {}
void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg) {}
char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len) { return NULL; }
SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len) { return NULL; }
// SyncApplyMsg to SRpcMsg, put it into ApplyQ
void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg) {}
// get SRpcMsg from ApplyQ, to SyncApplyMsg
void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg) {}
// SyncApplyMsg to OriginalRpcMsg
void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg) {}
SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg) { return NULL; }
cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg) { return NULL; }
char* syncApplyMsg2Str(const SyncApplyMsg* pMsg) { return NULL; }
// for debug ----------------------
void syncApplyMsgPrint(const SyncApplyMsg* pMsg) {}
void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg) {}
void ssyncApplyMsgLog(const SyncApplyMsg* pMsg) {}
void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg) {}