From a6e3da8045bf1431d55a1aa2d110e2a59ede23f3 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 19 Oct 2022 09:37:38 +0800 Subject: [PATCH 1/2] chore: code optimization for sma commit --- source/dnode/vnode/src/inc/vnodeInt.h | 6 ++--- source/dnode/vnode/src/sma/smaCommit.c | 33 +++++++++++++++++------- source/dnode/vnode/src/vnd/vnodeCommit.c | 29 +++++---------------- 3 files changed, 33 insertions(+), 35 deletions(-) diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 712f8bd15b..4685d3696b 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -206,9 +206,9 @@ int32_t smaBegin(SSma* pSma); int32_t smaSyncPreCommit(SSma* pSma); int32_t smaSyncCommit(SSma* pSma); int32_t smaSyncPostCommit(SSma* pSma); -int32_t smaAsyncPreCommit(SSma* pSma); -int32_t smaAsyncCommit(SSma* pSma); -int32_t smaAsyncPostCommit(SSma* pSma); +int32_t smaPreCommit(SSma* pSma); +int32_t smaCommit(SSma* pSma); +int32_t smaPostCommit(SSma* pSma); int32_t smaDoRetention(SSma* pSma, int64_t now); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 6168a00815..3dce724de7 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -54,28 +54,28 @@ int32_t smaSyncPostCommit(SSma *pSma) { return tdProcessRSmaSyncPostCommitImpl(p #endif /** - * @brief Only applicable to Rollup SMA + * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaAsyncPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } +int32_t smaPreCommit(SSma *pSma) { return tdProcessRSmaAsyncPreCommitImpl(pSma); } /** - * @brief Only applicable to Rollup SMA + * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaAsyncCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } +int32_t smaCommit(SSma *pSma) { return tdProcessRSmaAsyncCommitImpl(pSma); } /** - * @brief Only applicable to Rollup SMA + * @brief async commit, only applicable to Rollup SMA * * @param pSma * @return int32_t */ -int32_t smaAsyncPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); } +int32_t smaPostCommit(SSma *pSma) { return tdProcessRSmaAsyncPostCommitImpl(pSma); } /** * @brief set rsma trigger stat active @@ -366,9 +366,11 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { * @return int32_t */ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { + int32_t code = 0; + SVnode *pVnode = pSma->pVnode; SSmaEnv *pSmaEnv = SMA_RSMA_ENV(pSma); if (!pSmaEnv) { - return TSDB_CODE_SUCCESS; + goto _exit; } #if 0 SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pSmaEnv); @@ -378,8 +380,21 @@ static int32_t tdProcessRSmaAsyncCommitImpl(SSma *pSma) { return TSDB_CODE_FAILED; } #endif - - return TSDB_CODE_SUCCESS; + if ((code = tsdbCommit(VND_RSMA0(pVnode))) < 0) { + smaError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + if ((code = tsdbCommit(VND_RSMA1(pVnode))) < 0) { + smaError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } + if ((code = tsdbCommit(VND_RSMA2(pVnode))) < 0) { + smaError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(code)); + goto _exit; + } +_exit: + terrno = code; + return code; } /** diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 07c4c32955..38f21174a7 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -239,10 +239,8 @@ int vnodeCommit(SVnode *pVnode) { } walBeginSnapshot(pVnode->pWal, pVnode->state.applied); - // preCommit - // smaSyncPreCommit(pVnode->pSma); - if(smaAsyncPreCommit(pVnode->pSma) < 0){ - vError("vgId:%d, failed to async pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); + if (smaPreCommit(pVnode->pSma) < 0) { + vError("vgId:%d, failed to pre-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } @@ -256,21 +254,8 @@ int vnodeCommit(SVnode *pVnode) { } if (VND_IS_RSMA(pVnode)) { - if (smaAsyncCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to async commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - - if (tsdbCommit(VND_RSMA0(pVnode)) < 0) { - vError("vgId:%d, failed to commit tsdb rsma0 since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - if (tsdbCommit(VND_RSMA1(pVnode)) < 0) { - vError("vgId:%d, failed to commit tsdb rsma1 since %s", TD_VID(pVnode), tstrerror(terrno)); - return -1; - } - if (tsdbCommit(VND_RSMA2(pVnode)) < 0) { - vError("vgId:%d, failed to commit tsdb rsma2 since %s", TD_VID(pVnode), tstrerror(terrno)); + if (smaCommit(pVnode->pSma) < 0) { + vError("vgId:%d, failed to commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } } else { @@ -294,10 +279,8 @@ int vnodeCommit(SVnode *pVnode) { pVnode->state.committed = info.state.committed; - // postCommit - // smaSyncPostCommit(pVnode->pSma); - if (smaAsyncPostCommit(pVnode->pSma) < 0) { - vError("vgId:%d, failed to async post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); + if (smaPostCommit(pVnode->pSma) < 0) { + vError("vgId:%d, failed to post-commit sma since %s", TD_VID(pVnode), tstrerror(terrno)); return -1; } From 76843a54cbefa98112db0b1f48214c8285c4c168 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 19 Oct 2022 10:30:14 +0800 Subject: [PATCH 2/2] chore: test case optimization for tsma --- tests/script/tsim/sma/tsmaCreateInsertQuery.sim | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim index 2ff01263a4..7aef9e0f86 100644 --- a/tests/script/tsim/sma/tsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/tsmaCreateInsertQuery.sim @@ -5,7 +5,7 @@ sleep 50 sql connect print =============== create database -sql create database d1 vgroups 1 +sql create database d1 keep 36500d vgroups 1 sql use d1 print =============== create super table, include column type for count/sum/min/max/first @@ -25,8 +25,8 @@ if $rows != 1 then endi print =============== insert data, mode1: one row one table in sql -sql insert into ct1 values(now+0s, 10, 2.0, 3.0) -sql insert into ct1 values(now+1s, 11, 2.1, 3.1)(now+2s, -12, -2.2, -3.2)(now+3s, -13, -2.3, -3.3) +sql insert into ct1 values('2022-10-19 09:55:45.682', 10, 2.0, 3.0) +sql insert into ct1 values('2022-10-19 09:55:46.682', 11, 2.1, 3.1)('2022-10-19 09:55:47.682', -12, -2.2, -3.2)('2022-10-19 09:55:48.682', -13, -2.3, -3.3) print =============== create sma index from super table @@ -34,7 +34,7 @@ sql create sma index sma_index_name1 on stb function(max(c1),max(c2),min(c1)) in print $data00 $data01 $data02 $data03 print =============== trigger stream to execute sma aggr task and insert sma data into sma store -sql insert into ct1 values(now+5s, 20, 20.0, 30.0) +sql insert into ct1 values('2022-10-19 09:55:50.682', 20, 20.0, 30.0) #=================================================================== print =============== show streams ================================