From 2a45862946d7b47063e5cc03b752aa352c03b6b4 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 30 Aug 2022 09:55:47 +0800 Subject: [PATCH 1/5] other: code format --- source/dnode/vnode/src/sma/smaCommit.c | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 0e644be288..4bd3cfa5ac 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -312,7 +312,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { SSmaStat *pStat = SMA_ENV_STAT(pEnv); SRSmaStat *pRSmaStat = SMA_STAT_RSMA(pStat); - int32_t nLoops = 0; + int32_t nLoops = 0; // step 1: set rsma stat atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); @@ -351,7 +351,8 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { return TSDB_CODE_FAILED; } - smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), (void*)taosGetSelfPthreadId()); + smaInfo("vgId:%d, rsma commit, wait for all items to be consumed, TID:%p", SMA_VID(pSma), + (void *)taosGetSelfPthreadId()); nLoops = 0; while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { ++nLoops; @@ -366,7 +367,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { } smaInfo("vgId:%d, rsma commit, operator state commited, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); -#if 0 // consuming task of qTaskInfo clone +#if 0 // consuming task of qTaskInfo clone // step 4: swap queue/qall and iQueue/iQall // lock // taosWLockLatch(SMA_ENV_LOCK(pEnv)); From 5531a5a121155cf2b28bbadda404a7e0e5d62e31 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 30 Aug 2022 11:33:34 +0800 Subject: [PATCH 2/5] enh: rsam fetch all logic optimization --- source/dnode/vnode/src/inc/sma.h | 1 + source/dnode/vnode/src/sma/smaRollup.c | 9 +++++++-- source/dnode/vnode/src/sma/smaSnapshot.c | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index abfffc045f..c29c4cb6c4 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -95,6 +95,7 @@ struct SRSmaStat { int64_t refId; // shared by fetch tasks volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + volatile int32_t nFetchAll; // active number of fetch all int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 52b08d131c..89cdd58c4e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1714,9 +1714,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) { + int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); + if (oldStat == 0 || + ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { + atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); - atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { + atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + } } if (qallItemSize > 0) { diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 335c15a539..28d7218bc8 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -80,7 +80,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead goto _err; } pReader->pQTaskFReader->pReadH = qTaskF; -#if 0 +#if 1 SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask; pQTaskF->nRef = 1; #endif From 7a67a9be4b424e1ff84ac22a6b123c8a67c9fb43 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 30 Aug 2022 11:44:28 +0800 Subject: [PATCH 3/5] enh: rsma fetch all logic optimization --- source/dnode/vnode/src/inc/sma.h | 1 + source/dnode/vnode/src/sma/smaRollup.c | 9 +++++++-- source/dnode/vnode/src/sma/smaSnapshot.c | 2 +- 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index abfffc045f..c29c4cb6c4 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -95,6 +95,7 @@ struct SRSmaStat { int64_t refId; // shared by fetch tasks volatile int64_t nBufItems; // number of items in queue buffer SRWLatch lock; // r/w lock for rsma fs(e.g. qtaskinfo) + volatile int32_t nFetchAll; // active number of fetch all int8_t triggerStat; // shared by fetch tasks int8_t commitStat; // 0 not in committing, 1 in committing SArray *aTaskFile; // qTaskFiles committed recently(for recovery/snapshot r/w) diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 52b08d131c..89cdd58c4e 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -1714,9 +1714,14 @@ int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type) { smaDebug("vgId:%d, batchSize:%d, execType:%" PRIi8, SMA_VID(pSma), qallItemSize, type); } - if (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2) == 0) { + int8_t oldStat = atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 2); + if (oldStat == 0 || + ((oldStat == 2) && atomic_load_8(RSMA_TRIGGER_STAT(pRSmaStat)) < TASK_TRIGGER_STAT_PAUSED)) { + atomic_fetch_add_32(&pRSmaStat->nFetchAll, 1); tdRSmaFetchAllResult(pSma, pInfo, pSubmitArr); - atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + if (0 == atomic_sub_fetch_32(&pRSmaStat->nFetchAll, 1)) { + atomic_store_8(RSMA_COMMIT_STAT(pRSmaStat), 0); + } } if (qallItemSize > 0) { diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 335c15a539..28d7218bc8 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -80,7 +80,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead goto _err; } pReader->pQTaskFReader->pReadH = qTaskF; -#if 0 +#if 1 SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask; pQTaskF->nRef = 1; #endif From 3b3abbc293e85721f13edbb78a1401f1a927bcb3 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 30 Aug 2022 11:47:23 +0800 Subject: [PATCH 4/5] other: revert the code --- source/dnode/vnode/src/sma/smaSnapshot.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/sma/smaSnapshot.c b/source/dnode/vnode/src/sma/smaSnapshot.c index 28d7218bc8..335c15a539 100644 --- a/source/dnode/vnode/src/sma/smaSnapshot.c +++ b/source/dnode/vnode/src/sma/smaSnapshot.c @@ -80,7 +80,7 @@ int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapRead goto _err; } pReader->pQTaskFReader->pReadH = qTaskF; -#if 1 +#if 0 SQTaskFile* pQTaskF = &pReader->pQTaskFReader->fTask; pQTaskF->nRef = 1; #endif From e257bd3986cacc260eebc30aeae9f56c559a765b Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 30 Aug 2022 11:50:50 +0800 Subject: [PATCH 5/5] other: code optimization --- source/dnode/vnode/src/sma/smaCommit.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 4bd3cfa5ac..4d1dcd6909 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -327,7 +327,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma) { ASSERT(pRSmaStat->commitAppliedVer > 0); // step 2: wait for all triggered fetch tasks to finish - + nLoops = 0; while (1) { if (T_REF_VAL_GET(pStat) == 0) { smaDebug("vgId:%d, rsma commit, fetch tasks are all finished", SMA_VID(pSma));