From 43e2f8bc2bea1cf6a95741d764bcae9786ac1840 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 26 Jul 2022 10:59:56 +0800 Subject: [PATCH 1/3] refactor(sync): add pre-commit interface --- source/libs/sync/inc/syncInt.h | 1 + source/libs/sync/src/syncAppendEntries.c | 46 +++++------------------- source/libs/sync/src/syncMain.c | 33 ++--------------- 3 files changed, 12 insertions(+), 68 deletions(-) diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index 64f66e390a..b802d94bea 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -238,6 +238,7 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd bool syncNodeIsOptimizedOneReplica(SSyncNode* ths, SRpcMsg* pMsg); int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex, uint64_t flag); +int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code); int32_t syncNodeUpdateNewConfigIndex(SSyncNode* ths, SSyncCfg* pNewCfg); diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c index 4295abeaa1..7d0f53640c 100644 --- a/source/libs/sync/src/syncAppendEntries.c +++ b/source/libs/sync/src/syncAppendEntries.c @@ -244,22 +244,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pAppendEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pAppendEntry->isWeak; - cbMeta.code = 2; - cbMeta.state = ths->state; - cbMeta.seqNum = pAppendEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); + syncNodePreCommit(ths, pAppendEntry, 0); } // free memory @@ -280,22 +265,7 @@ int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg) { ths->pLogStore->appendEntry(ths->pLogStore, pAppendEntry); // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pAppendEntry, &rpcMsg); - if (ths->pFsm != NULL) { - // if (ths->pFsm->FpPreCommitCb != NULL && pAppendEntry->originalRpcType != TDMT_SYNC_NOOP) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pAppendEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pAppendEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pAppendEntry->isWeak; - cbMeta.code = 3; - cbMeta.state = ths->state; - cbMeta.seqNum = pAppendEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); + syncNodePreCommit(ths, pAppendEntry, 0); // free memory syncEntryDestory(pAppendEntry); @@ -440,7 +410,7 @@ static int32_t syncNodeDoMakeLogSame(SSyncNode* ths, SyncIndex FromIndex) { return code; } -static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { +int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry, int32_t code) { SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); @@ -456,7 +426,7 @@ static int32_t syncNodePreCommit(SSyncNode* ths, SSyncRaftEntry* pEntry) { cbMeta.index = pEntry->index; cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 2; + cbMeta.code = code; cbMeta.state = ths->state; cbMeta.seqNum = pEntry->seqNum; ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); @@ -594,7 +564,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc return -1; } - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); // syncEntryDestory(pAppendEntry); @@ -715,7 +685,7 @@ int32_t syncNodeOnAppendEntriesSnapshot2Cb(SSyncNode* ths, SyncAppendEntriesBatc return -1; } - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); // syncEntryDestory(pAppendEntry); @@ -919,7 +889,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } // pre commit - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); // update match index @@ -1032,7 +1002,7 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs } // pre commit - code = syncNodePreCommit(ths, pAppendEntry); + code = syncNodePreCommit(ths, pAppendEntry, 0); ASSERT(code == 0); syncEntryDestory(pAppendEntry); diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index a453b2572c..63a404d5f6 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2504,23 +2504,11 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI } // pre commit + syncNodePreCommit(ths, pEntry, 0); + SRpcMsg rpcMsg; syncEntry2OriginalRpc(pEntry, &rpcMsg); - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 0; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); - // if only myself, maybe commit right now if (ths->replicaNum == 1) { syncMaybeAdvanceCommitIndex(ths); @@ -2528,22 +2516,7 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI } else { // pre commit - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); - - if (ths->pFsm != NULL) { - if (ths->pFsm->FpPreCommitCb != NULL && syncUtilUserPreCommit(pEntry->originalRpcType)) { - SFsmCbMeta cbMeta = {0}; - cbMeta.index = pEntry->index; - cbMeta.lastConfigIndex = syncNodeGetSnapshotConfigIndex(ths, cbMeta.index); - cbMeta.isWeak = pEntry->isWeak; - cbMeta.code = 1; - cbMeta.state = ths->state; - cbMeta.seqNum = pEntry->seqNum; - ths->pFsm->FpPreCommitCb(ths->pFsm, &rpcMsg, cbMeta); - } - } - rpcFreeCont(rpcMsg.pCont); + syncNodePreCommit(ths, pEntry, 0); } if (pRetIndex != NULL) { From 0addf0999693fcc1fcf3c5fc54e5c94049b0b942 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 26 Jul 2022 11:24:39 +0800 Subject: [PATCH 2/3] refactor(sync): add pre-commit interface --- source/dnode/vnode/src/vnd/vnodeSync.c | 33 +++++++++++++++++++------- source/libs/sync/src/syncCommit.c | 26 +++++--------------- 2 files changed, 31 insertions(+), 28 deletions(-) diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index a0e2354f51..2b760efba0 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -206,13 +206,13 @@ static void inline vnodeProposeBatchMsg(SVnode *pVnode, SRpcMsg **pMsgArr, bool } void vnodeProposeWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) { - SVnode *pVnode = pInfo->ahandle; - int32_t vgId = pVnode->config.vgId; - int32_t code = 0; - SRpcMsg *pMsg = NULL; - int32_t arrayPos = 0; - SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg*)); - bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool)); + SVnode *pVnode = pInfo->ahandle; + int32_t vgId = pVnode->config.vgId; + int32_t code = 0; + SRpcMsg *pMsg = NULL; + int32_t arrayPos = 0; + SRpcMsg **pMsgArr = taosMemoryCalloc(numOfMsgs, sizeof(SRpcMsg *)); + bool *pIsWeakArr = taosMemoryCalloc(numOfMsgs, sizeof(bool)); vTrace("vgId:%d, get %d msgs from vnode-write queue", vgId, numOfMsgs); for (int32_t msg = 0; msg < numOfMsgs; msg++) { @@ -506,7 +506,7 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); - if (cbMeta.code == 0) { + if (cbMeta.code == 0 && cbMeta.isWeak == 0) { SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); @@ -529,6 +529,23 @@ static void vnodeSyncPreCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMet vTrace("vgId:%d, pre-commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s", syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType)); + + if (cbMeta.code == 0 && cbMeta.isWeak == 1) { + SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; + rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); + memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); + syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info); + rpcMsg.info.conn.applyIndex = cbMeta.index; + rpcMsg.info.conn.applyTerm = cbMeta.term; + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); + } else { + SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info}; + vError("vgId:%d, sync pre-commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync), + pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code)); + if (rsp.info.handle != NULL) { + tmsgSendRsp(&rsp); + } + } } static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) { diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c index b3cdd079a4..fd6577477f 100644 --- a/source/libs/sync/src/syncCommit.c +++ b/source/libs/sync/src/syncCommit.c @@ -67,11 +67,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { for (SyncIndex index = syncNodeGetLastIndex(pSyncNode); index > pSyncNode->commitIndex; --index) { bool agree = syncAgree(pSyncNode, index); - if (gRaftDetailLog) { - sTrace("syncMaybeAdvanceCommitIndex syncAgree:%d, index:%" PRId64 ", pSyncNode->commitIndex:%" PRId64, agree, - index, pSyncNode->commitIndex); - } - if (agree) { // term SSyncRaftEntry* pEntry = pSyncNode->pLogStore->getEntry(pSyncNode->pLogStore, index); @@ -82,20 +77,15 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { // update commit index newCommitIndex = index; - if (gRaftDetailLog) { - sTrace("syncMaybeAdvanceCommitIndex maybe to update, newCommitIndex:%" PRId64 - " commit, pSyncNode->commitIndex:%" PRId64, - newCommitIndex, pSyncNode->commitIndex); - } - syncEntryDestory(pEntry); break; } else { - if (gRaftDetailLog) { - sTrace("syncMaybeAdvanceCommitIndex can not commit due to term not equal, pEntry->term:%" PRIu64 - ", pSyncNode->pRaftStore->currentTerm:%" PRIu64, - pEntry->term, pSyncNode->pRaftStore->currentTerm); - } + do { + char logBuf[128]; + snprintf(logBuf, sizeof(logBuf), "can not commit due to term not equal, index:%ld, term:%lu", pEntry->index, + pEntry->term); + syncNodeEventLog(pSyncNode, logBuf); + } while (0); } syncEntryDestory(pEntry); @@ -107,10 +97,6 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) { SyncIndex beginIndex = pSyncNode->commitIndex + 1; SyncIndex endIndex = newCommitIndex; - if (gRaftDetailLog) { - sTrace("syncMaybeAdvanceCommitIndex sync commit %" PRId64, newCommitIndex); - } - // update commit index pSyncNode->commitIndex = newCommitIndex; From 49a4e83d484c32d839ee97561a7034920c7d9b65 Mon Sep 17 00:00:00 2001 From: Minghao Li Date: Tue, 26 Jul 2022 13:23:56 +0800 Subject: [PATCH 3/3] refactor(sync): add pre-commit interface --- source/libs/sync/src/syncMain.c | 3 --- 1 file changed, 3 deletions(-) diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 63a404d5f6..935d89b99b 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -2506,9 +2506,6 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI // pre commit syncNodePreCommit(ths, pEntry, 0); - SRpcMsg rpcMsg; - syncEntry2OriginalRpc(pEntry, &rpcMsg); - // if only myself, maybe commit right now if (ths->replicaNum == 1) { syncMaybeAdvanceCommitIndex(ths);