From 4edef438eabb3c050ab36f84ba9f294ad3fa9909 Mon Sep 17 00:00:00 2001 From: Benguang Zhao Date: Sat, 26 Nov 2022 21:53:38 +0800 Subject: [PATCH] enh: transfer ownership of msgs while committing sync log entries --- include/libs/sync/sync.h | 10 +++++----- source/dnode/mgmt/mgmt_mnode/src/mmWorker.c | 2 ++ source/dnode/mgmt/mgmt_qnode/src/qmWorker.c | 2 ++ source/dnode/mgmt/mgmt_snode/src/smWorker.c | 1 + source/dnode/mgmt/mgmt_vnode/src/vmWorker.c | 2 +- source/dnode/mnode/impl/src/mndSync.c | 12 +++++++----- source/dnode/vnode/src/vnd/vnodeSync.c | 21 ++++++++------------- source/libs/sync/src/syncPipeline.c | 1 + source/libs/sync/src/syncRaftEntry.c | 2 +- 9 files changed, 28 insertions(+), 25 deletions(-) diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 7fb3a890cb..00935f280a 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -134,13 +134,13 @@ typedef struct SSnapshotMeta { typedef struct SSyncFSM { void* data; - void (*FpCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); - void (*FpRollBackCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + void (*FpCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + void (*FpPreCommitCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + void (*FpRollBackCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); void (*FpRestoreFinishCb)(const struct SSyncFSM* pFsm); - void (*FpReConfigCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); - void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, const SRpcMsg* pMsg, const SFsmCbMeta* pMeta); + void (*FpReConfigCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SReConfigCbMeta* pMeta); + void (*FpLeaderTransferCb)(const struct SSyncFSM* pFsm, SRpcMsg* pMsg, const SFsmCbMeta* pMeta); bool (*FpApplyQueueEmptyCb)(const struct SSyncFSM* pFsm); int32_t (*FpApplyQueueItems)(const struct SSyncFSM* pFsm); diff --git a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c index d3d92e1bbf..212de0bfb4 100644 --- a/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c +++ b/source/dnode/mgmt/mgmt_mnode/src/mmWorker.c @@ -162,11 +162,13 @@ int32_t mmPutMsgToQueue(SMnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + pRpc->pCont = NULL; dTrace("msg:%p, is created and will put into %s queue, type:%s", pMsg, pWorker->name, TMSG_INFO(pRpc->msgType)); int32_t code = mmPutMsgToWorker(pMgmt, pWorker, pMsg); if (code != 0) { dTrace("msg:%p, is freed", pMsg); + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); } return code; diff --git a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c index edbe9882a4..3e5ad65db7 100644 --- a/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c +++ b/source/dnode/mgmt/mgmt_qnode/src/qmWorker.c @@ -61,6 +61,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { SRpcMsg *pMsg = taosAllocateQitem(sizeof(SRpcMsg), RPC_QITEM); if (pMsg == NULL) return -1; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + pRpc->pCont = NULL; switch (qtype) { case QUERY_QUEUE: @@ -74,6 +75,7 @@ int32_t qmPutRpcMsgToQueue(SQnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { return 0; default: terrno = TSDB_CODE_INVALID_PARA; + rpcFreeCont(pMsg->pCont); taosFreeQitem(pMsg); return -1; } diff --git a/source/dnode/mgmt/mgmt_snode/src/smWorker.c b/source/dnode/mgmt/mgmt_snode/src/smWorker.c index f6942c8114..2d2a121795 100644 --- a/source/dnode/mgmt/mgmt_snode/src/smWorker.c +++ b/source/dnode/mgmt/mgmt_snode/src/smWorker.c @@ -151,6 +151,7 @@ int32_t smPutMsgToQueue(SSnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { pHead->contLen = htonl(pHead->contLen); pHead->vgId = SNODE_HANDLE; memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + pRpc->pCont = NULL; switch (qtype) { case STREAM_QUEUE: diff --git a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c index 4aa07cad98..08ea880b97 100644 --- a/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c +++ b/source/dnode/mgmt/mgmt_vnode/src/vmWorker.c @@ -246,12 +246,12 @@ int32_t vmPutRpcMsgToQueue(SVnodeMgmt *pMgmt, EQueueType qtype, SRpcMsg *pRpc) { pHead->contLen = htonl(pHead->contLen); pHead->vgId = htonl(pHead->vgId); memcpy(pMsg, pRpc, sizeof(SRpcMsg)); + pRpc->pCont = NULL; int32_t code = vmPutMsgToQueue(pMgmt, pMsg, qtype); if (code != 0) { dTrace("msg:%p, is freed", pMsg); rpcFreeCont(pMsg->pCont); - pRpc->pCont = NULL; taosFreeQitem(pMsg); } diff --git a/source/dnode/mnode/impl/src/mndSync.c b/source/dnode/mnode/impl/src/mndSync.c index 320cc10d40..10cd6416b4 100644 --- a/source/dnode/mnode/impl/src/mndSync.c +++ b/source/dnode/mnode/impl/src/mndSync.c @@ -72,15 +72,11 @@ static int32_t mndSyncSendMsg(const SEpSet *pEpSet, SRpcMsg *pMsg) { return code; } -void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +void mndProcessWriteMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SMnode *pMnode = pFsm->data; SSyncMgmt *pMgmt = &pMnode->syncMgmt; SSdbRaw *pRaw = pMsg->pCont; - // delete msg handle - SRpcMsg rpcMsg = {0}; - rpcMsg.info = pMsg->info; - int32_t transId = sdbGetIdFromRaw(pMnode->pSdb, pRaw); pMgmt->errCode = pMeta->code; mInfo("trans:%d, is proposed, saved:%d code:0x%x, apply index:%" PRId64 " term:%" PRIu64 " config:%" PRId64 @@ -120,6 +116,12 @@ void mndSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMet } } +void mndSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { + mndProcessWriteMsg(pFsm, pMsg, pMeta); + rpcFreeCont(pMsg->pCont); + pMsg->pCont = NULL; +} + int32_t mndSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot, void *pReaderParam, void **ppReader) { mInfo("start to read snapshot from sdb in atomic way"); SMnode *pMnode = pFsm->data; diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index bc6eb81275..f5c1e1b169 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -295,36 +295,31 @@ static int32_t vnodeSyncGetSnapshot(const SSyncFSM *pFsm, SSnapshot *pSnapshot) return 0; } -static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +static void vnodeSyncApplyMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; - - SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen}; - rpcMsg.pCont = rpcMallocCont(rpcMsg.contLen); - memcpy(rpcMsg.pCont, pMsg->pCont, pMsg->contLen); - rpcMsg.info = pMsg->info; - rpcMsg.info.conn.applyIndex = pMeta->index; - rpcMsg.info.conn.applyTerm = pMeta->term; + pMsg->info.conn.applyIndex = pMeta->index; + pMsg->info.conn.applyTerm = pMeta->term; const STraceId *trace = &pMsg->info.traceId; vGTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", - pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, rpcMsg.info.conn.applyIndex, pMeta->isWeak, pMeta->code, + pVnode->config.vgId, pFsm, pMeta->index, pMeta->term, pMsg->info.conn.applyIndex, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), TMSG_INFO(pMsg->msgType)); - tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg); + tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, pMsg); } -static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +static void vnodeSyncCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } -static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +static void vnodeSyncPreCommitMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { if (pMeta->isWeak == 1) { vnodeSyncApplyMsg(pFsm, pMsg, pMeta); } } -static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, const SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { +static void vnodeSyncRollBackMsg(const SSyncFSM *pFsm, SRpcMsg *pMsg, const SFsmCbMeta *pMeta) { SVnode *pVnode = pFsm->data; vTrace("vgId:%d, rollback-cb is excuted, fsm:%p, index:%" PRId64 ", weak:%d, code:%d, state:%d %s, type:%s", pVnode->config.vgId, pFsm, pMeta->index, pMeta->isWeak, pMeta->code, pMeta->state, syncStr(pMeta->state), diff --git a/source/libs/sync/src/syncPipeline.c b/source/libs/sync/src/syncPipeline.c index bf8f830185..2000379160 100644 --- a/source/libs/sync/src/syncPipeline.c +++ b/source/libs/sync/src/syncPipeline.c @@ -455,6 +455,7 @@ int32_t syncLogFsmExecute(SSyncNode* pNode, SSyncFSM* pFsm, ESyncState role, Syn (void)syncRespMgrGetAndDel(pNode->pSyncRespMgr, cbMeta.seqNum, &rpcMsg.info); pFsm->FpCommitCb(pFsm, &rpcMsg, &cbMeta); + ASSERT(rpcMsg.pCont == NULL); return 0; } diff --git a/source/libs/sync/src/syncRaftEntry.c b/source/libs/sync/src/syncRaftEntry.c index 959e00fcde..988a86cc67 100644 --- a/source/libs/sync/src/syncRaftEntry.c +++ b/source/libs/sync/src/syncRaftEntry.c @@ -20,7 +20,7 @@ SSyncRaftEntry* syncEntryBuild(int32_t dataLen) { int32_t bytes = sizeof(SSyncRaftEntry) + dataLen; - SSyncRaftEntry* pEntry = taosMemoryMalloc(bytes); + SSyncRaftEntry* pEntry = taosMemoryCalloc(1, bytes); if (pEntry == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL;