From 376bf46a64dd515d7cf043f2d055261f968b0409 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Thu, 30 Jun 2022 15:28:23 +0800 Subject: [PATCH] refactor(sync): add SyncClientRequestBatch --- include/common/tmsgdef.h | 1 + include/libs/sync/sync.h | 1 + include/libs/sync/syncTools.h | 31 ++++++++++++++++++++++++ source/libs/sync/src/syncMain.c | 2 ++ source/libs/sync/src/syncMessage.c | 39 ++++++++++++++++++++++++++++++ 5 files changed, 74 insertions(+) diff --git a/include/common/tmsgdef.h b/include/common/tmsgdef.h index 926d4eb51f..c0c1482828 100644 --- a/include/common/tmsgdef.h +++ b/include/common/tmsgdef.h @@ -232,6 +232,7 @@ enum { TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL) + TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL) TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 5fa7eed40c..dea6d35d4d 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -202,6 +202,7 @@ SyncGroupId syncGetVgId(int64_t rid); void syncGetEpSet(int64_t rid, SEpSet* pEpSet); void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet); int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak); +int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize); bool syncEnvIsStart(); const char* syncStr(ESyncState state); bool syncIsRestoreFinish(int64_t rid); diff --git a/include/libs/sync/syncTools.h b/include/libs/sync/syncTools.h index 8975f930db..c4ce9b9694 100644 --- a/include/libs/sync/syncTools.h +++ b/include/libs/sync/syncTools.h @@ -219,6 +219,33 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg); void syncClientRequestLog(const SyncClientRequest* pMsg); void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg); +// --------------------------------------------- +typedef struct SOffsetAndContLen { + int32_t offset; + int32_t contLen; +} SOffsetAndContLen; + +typedef struct SRaftMeta { + uint64_t seqNum; + bool isWeak; +} SRaftMeta; + +// block1: +// block2: SRaftMeta array +// block3: rpc msg array (with pCont) + +typedef struct SyncClientRequestBatch { + uint32_t bytes; + int32_t vgId; + uint32_t msgType; // SyncClientRequestBatch msgType + uint32_t dataCount; + uint32_t dataLen; // user RpcMsg.contLen + char data[]; // user RpcMsg.pCont +} SyncClientRequestBatch; + +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, + int32_t vgId); + // --------------------------------------------- typedef struct SyncClientRequestReply { uint32_t bytes; @@ -325,10 +352,14 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg); void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg); // --------------------------------------------- + +// define ahead +/* typedef struct SOffsetAndContLen { int32_t offset; int32_t contLen; } SOffsetAndContLen; +*/ typedef struct SyncAppendEntriesBatch { uint32_t bytes; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index d32153e5ed..c065f74663 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -627,6 +627,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) { return ret; } +int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { return 0; } + int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) { int32_t ret = 0; diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index acece6d48a..38ecb5c9d1 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -956,6 +956,45 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) { } } +// ---- message process SyncClientRequestBatch---- + +// block1: +// block2: SRaftMeta array +// block3: rpc msg array (with pCont) + +SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize, + int32_t vgId) { + ASSERT(rpcMsgArr != NULL); + ASSERT(arrSize > 0); + + int32_t dataLen = 0; + int32_t raftMetaArrayLen = sizeof(SRpcMsg) * arrSize; + int32_t rpcArrayLen = sizeof(SRaftMeta) * arrSize; + + uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen; + SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->vgId = vgId; + pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH; + pMsg->dataCount = arrSize; + pMsg->dataLen = dataLen; + + SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data); + SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen); + + for (int i = 0; i < arrSize; ++i) { + // init raftMetaArr + raftMetaArr[i].isWeak = raftArr[i].isWeak; + raftMetaArr[i].seqNum = raftArr[i].seqNum; + + // init msgArr + msgArr[i] = rpcMsgArr[i]; + } + + return pMsg; +} + // ---- message process SyncRequestVote---- SyncRequestVote* syncRequestVoteBuild(int32_t vgId) { uint32_t bytes = sizeof(SyncRequestVote);