From 54526adcc3e304c48ef521669020f9ec1bbd2410 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 19 Oct 2022 17:18:41 +0800 Subject: [PATCH] fix: commit txn for rsma --- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/sma/smaCommit.c | 24 ++++++++++++++++++++++++ source/dnode/vnode/src/vnd/vnodeCommit.c | 20 +++++++++++--------- 3 files changed, 36 insertions(+), 9 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 42ab0c2a37..a5dc4431ab 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -211,6 +211,7 @@ int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma); int32_t smaPreCommit(SSma* pSma); int32_t smaCommit(SSma* pSma); +int32_t smaFinishCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 3dce724de7..d1717f9a1e 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -112,6 +112,30 @@ int32_t smaBegin(SSma *pSma) { return TSDB_CODE_SUCCESS; } +int32_t smaFinishCommit(SSma *pSma) { + int32_t code = 0; + SVnode *pVnode = pSma->pVnode; + SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); + if (!pSmaEnv) { + goto _exit; + } + if ((code = tsdbFinishCommit(VND_RSMA0(pVnode))) < 0) { + smaError("vgId:%d, failed to finish commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + if ((code = tsdbFinishCommit(VND_RSMA1(pVnode))) < 0) { + smaError("vgId:%d, failed to finish commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + if ((code = tsdbFinishCommit(VND_RSMA2(pVnode))) < 0) { + smaError("vgId:%d, failed to finish commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } +_exit: + terrno = code; + return code; +} + #if 0 /** * @brief pre-commit for rollup sma(sync commit). diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 24b678b5eb..0bc1623d8b 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -239,10 +239,8 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); - if (smaPreCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } + code = smaPreCommit(pVnode->pSma); + TSDB_CHECK_CODE(code, lino, _exit); vnodeBufPoolUnRef(pVnode->inUse); pVnode->inUse = NULL; @@ -254,10 +252,8 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { - if (smaCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } + code = smaCommit(pVnode->pSma); + TSDB_CHECK_CODE(code, lino, _exit); } else { code = tsdbCommit(pVnode->pTsdb); TSDB_CHECK_CODE(code, lino, _exit); @@ -274,7 +270,13 @@ int vnodeCommit(SVnode *pVnode) { TSDB_CHECK_CODE(code, lino, _exit); } - tsdbFinishCommit(pVnode->pTsdb); + if (VND_IS_RSMA(pVnode)) { + code = smaFinishCommit(pVnode->pSma); + TSDB_CHECK_CODE(code, lino, _exit); + } else { + code = tsdbFinishCommit(pVnode->pTsdb); + TSDB_CHECK_CODE(code, lino, _exit); + } if (metaFinishCommit(pVnode->pMeta) < 0) { code = terrno;