refactor(sync): add propose batch
This commit is contained in:
parent
64f4325bd7
commit
5ad0ac9500
|
@ -413,7 +413,7 @@ int32_t vnodeProcessSyncMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
|
SyncClientRequestBatch *pSyncMsg = syncClientRequestBatchFromRpcMsg(pMsg);
|
||||||
ASSERT(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
code = syncNodeOnClientRequestBatchCb(pSyncNode, pSyncMsg);
|
||||||
syncClientRequestBatchDestroyDeep(pSyncMsg);
|
syncClientRequestBatchDestroy(pSyncMsg);
|
||||||
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
} else if (pMsg->msgType == TDMT_SYNC_REQUEST_VOTE) {
|
||||||
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
SyncRequestVote *pSyncMsg = syncRequestVoteFromRpcMsg2(pMsg);
|
||||||
ASSERT(pSyncMsg != NULL);
|
ASSERT(pSyncMsg != NULL);
|
||||||
|
|
|
@ -736,6 +736,13 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
|
||||||
|
|
||||||
SRaftMeta raftArr[SYNC_MAX_BATCH_SIZE];
|
SRaftMeta raftArr[SYNC_MAX_BATCH_SIZE];
|
||||||
for (int i = 0; i < arrSize; ++i) {
|
for (int i = 0; i < arrSize; ++i) {
|
||||||
|
do {
|
||||||
|
char eventLog[128];
|
||||||
|
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d, batch:%d", TMSG_INFO(pMsgPArr[i]->msgType),
|
||||||
|
pMsgPArr[i]->msgType, arrSize);
|
||||||
|
syncNodeEventLog(pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
SRespStub stub;
|
SRespStub stub;
|
||||||
stub.createTime = taosGetTimestampMs();
|
stub.createTime = taosGetTimestampMs();
|
||||||
stub.rpcMsg = *(pMsgPArr[i]);
|
stub.rpcMsg = *(pMsgPArr[i]);
|
||||||
|
@ -790,9 +797,11 @@ int32_t syncNodeProposeBatch(SSyncNode* pSyncNode, SRpcMsg** pMsgPArr, bool* pIs
|
||||||
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
int32_t syncNodePropose(SSyncNode* pSyncNode, SRpcMsg* pMsg, bool isWeak) {
|
||||||
int32_t ret = 0;
|
int32_t ret = 0;
|
||||||
|
|
||||||
char eventLog[128];
|
do {
|
||||||
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
char eventLog[128];
|
||||||
syncNodeEventLog(pSyncNode, eventLog);
|
snprintf(eventLog, sizeof(eventLog), "propose type:%s,%d", TMSG_INFO(pMsg->msgType), pMsg->msgType);
|
||||||
|
syncNodeEventLog(pSyncNode, eventLog);
|
||||||
|
} while (0);
|
||||||
|
|
||||||
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
if (pSyncNode->state == TAOS_SYNC_STATE_LEADER) {
|
||||||
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
if (pSyncNode->changing && pMsg->msgType != TDMT_SYNC_CONFIG_CHANGE_FINISH) {
|
||||||
|
|
Loading…
Reference in New Issue