refactor(sync): add SyncClientRequestBatch
This commit is contained in:
parent
1a8cf049b7
commit
376bf46a64
|
@ -232,6 +232,7 @@ enum {
|
|||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING, "sync-ping", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_PING_REPLY, "sync-ping-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST, "sync-client-request", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_BATCH, "sync-client-request-batch", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_CLIENT_REQUEST_REPLY, "sync-client-request-reply", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE, "sync-request-vote", NULL, NULL)
|
||||
TD_DEF_MSG_TYPE(TDMT_SYNC_REQUEST_VOTE_REPLY, "sync-request-vote-reply", NULL, NULL)
|
||||
|
|
|
@ -202,6 +202,7 @@ SyncGroupId syncGetVgId(int64_t rid);
|
|||
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
|
||||
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
|
||||
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
|
||||
bool syncEnvIsStart();
|
||||
const char* syncStr(ESyncState state);
|
||||
bool syncIsRestoreFinish(int64_t rid);
|
||||
|
|
|
@ -219,6 +219,33 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
|
|||
void syncClientRequestLog(const SyncClientRequest* pMsg);
|
||||
void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SOffsetAndContLen {
|
||||
int32_t offset;
|
||||
int32_t contLen;
|
||||
} SOffsetAndContLen;
|
||||
|
||||
typedef struct SRaftMeta {
|
||||
uint64_t seqNum;
|
||||
bool isWeak;
|
||||
} SRaftMeta;
|
||||
|
||||
// block1:
|
||||
// block2: SRaftMeta array
|
||||
// block3: rpc msg array (with pCont)
|
||||
|
||||
typedef struct SyncClientRequestBatch {
|
||||
uint32_t bytes;
|
||||
int32_t vgId;
|
||||
uint32_t msgType; // SyncClientRequestBatch msgType
|
||||
uint32_t dataCount;
|
||||
uint32_t dataLen; // user RpcMsg.contLen
|
||||
char data[]; // user RpcMsg.pCont
|
||||
} SyncClientRequestBatch;
|
||||
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
int32_t vgId);
|
||||
|
||||
// ---------------------------------------------
|
||||
typedef struct SyncClientRequestReply {
|
||||
uint32_t bytes;
|
||||
|
@ -325,10 +352,14 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
|
|||
void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
|
||||
|
||||
// ---------------------------------------------
|
||||
|
||||
// define ahead
|
||||
/*
|
||||
typedef struct SOffsetAndContLen {
|
||||
int32_t offset;
|
||||
int32_t contLen;
|
||||
} SOffsetAndContLen;
|
||||
*/
|
||||
|
||||
typedef struct SyncAppendEntriesBatch {
|
||||
uint32_t bytes;
|
||||
|
|
|
@ -627,6 +627,8 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
|
|||
return ret;
|
||||
}
|
||||
|
||||
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize) { return 0; }
|
||||
|
||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||
int32_t ret = 0;
|
||||
|
||||
|
|
|
@ -956,6 +956,45 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
|
|||
}
|
||||
}
|
||||
|
||||
// ---- message process SyncClientRequestBatch----
|
||||
|
||||
// block1:
|
||||
// block2: SRaftMeta array
|
||||
// block3: rpc msg array (with pCont)
|
||||
|
||||
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
|
||||
int32_t vgId) {
|
||||
ASSERT(rpcMsgArr != NULL);
|
||||
ASSERT(arrSize > 0);
|
||||
|
||||
int32_t dataLen = 0;
|
||||
int32_t raftMetaArrayLen = sizeof(SRpcMsg) * arrSize;
|
||||
int32_t rpcArrayLen = sizeof(SRaftMeta) * arrSize;
|
||||
|
||||
uint32_t bytes = sizeof(SyncClientRequestBatch) + dataLen;
|
||||
SyncClientRequestBatch* pMsg = taosMemoryMalloc(bytes);
|
||||
memset(pMsg, 0, bytes);
|
||||
pMsg->bytes = bytes;
|
||||
pMsg->vgId = vgId;
|
||||
pMsg->msgType = TDMT_SYNC_CLIENT_REQUEST_BATCH;
|
||||
pMsg->dataCount = arrSize;
|
||||
pMsg->dataLen = dataLen;
|
||||
|
||||
SRaftMeta* raftMetaArr = (SRaftMeta*)(pMsg->data);
|
||||
SRpcMsg* msgArr = (SRpcMsg*)((char*)(pMsg->data) + raftMetaArrayLen);
|
||||
|
||||
for (int i = 0; i < arrSize; ++i) {
|
||||
// init raftMetaArr
|
||||
raftMetaArr[i].isWeak = raftArr[i].isWeak;
|
||||
raftMetaArr[i].seqNum = raftArr[i].seqNum;
|
||||
|
||||
// init msgArr
|
||||
msgArr[i] = rpcMsgArr[i];
|
||||
}
|
||||
|
||||
return pMsg;
|
||||
}
|
||||
|
||||
// ---- message process SyncRequestVote----
|
||||
SyncRequestVote* syncRequestVoteBuild(int32_t vgId) {
|
||||
uint32_t bytes = sizeof(SyncRequestVote);
|
||||
|
|
Loading…
Reference in New Issue