From f282acbc4c052ed851fc68770210b1fb8edae2f5 Mon Sep 17 00:00:00 2001 From: kailixu Date: Wed, 12 Jul 2023 10:13:50 +0800 Subject: [PATCH] chore: rsma refactor --- source/dnode/vnode/src/inc/sma.h | 14 ++++++-------- source/dnode/vnode/src/inc/vnodeInt.h | 2 +- source/dnode/vnode/src/sma/smaCommit.c | 6 +++--- source/dnode/vnode/src/sma/smaEnv.c | 5 +++-- source/dnode/vnode/src/sma/smaRollup.c | 2 +- source/dnode/vnode/src/sma/smaUtil.c | 2 -- source/dnode/vnode/src/tsdb/tsdbRead.c | 6 +++--- tests/script/tsim/sma/rsmaCreateInsertQuery.sim | 3 --- tests/script/tsim/sma/rsmaPersistenceRecovery.sim | 4 ++-- 9 files changed, 19 insertions(+), 25 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 29d3e752b2..f7eda438d0 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -187,6 +187,12 @@ typedef enum { RSMA_EXEC_COMMIT = 3, // triggered by commit } ERsmaExecType; +#define TD_SMA_LOOPS_CHECK(n, limit) \ + if (++(n) > limit) { \ + sched_yield(); \ + (n) = 0; \ + } + // sma int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType); void tdDestroySmaEnv(SSmaEnv *pSmaEnv); @@ -205,14 +211,6 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) { smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref); } -static FORCE_INLINE void tdSmaLoopsCheck(int32_t *pCnt, int32_t limit) { - ++(*pCnt); - if (*pCnt > limit) { - sched_yield(); - *pCnt = 0; - } -} - int32_t smaPreClose(SSma *pSma); // rsma diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 64613b7bd3..c8e8dc925d 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -467,7 +467,7 @@ enum { SNAP_DATA_DEL = 3, SNAP_DATA_RSMA1 = 4, SNAP_DATA_RSMA2 = 5, - SNAP_DATA_QTASK = 6, // obsolete + SNAP_DATA_QTASK = 6, SNAP_DATA_TQ_HANDLE = 7, SNAP_DATA_TQ_OFFSET = 8, SNAP_DATA_STREAM_TASK = 9, diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index 2aa898e59e..6ee7c414e0 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -147,7 +147,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED); if (isCommit) { while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) { - tdSmaLoopsCheck(&nLoops, 1000); + TD_SMA_LOOPS_CHECK(nLoops, 1000) } } // step 2: wait for all triggered fetch tasks to finish @@ -159,7 +159,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { } else { smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit); } - tdSmaLoopsCheck(&nLoops, 1000); + TD_SMA_LOOPS_CHECK(nLoops, 1000); } /** @@ -171,7 +171,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { (void *)taosGetSelfPthreadId()); nLoops = 0; while (atomic_load_64(&pRSmaStat->nBufItems) > 0) { - tdSmaLoopsCheck(&nLoops, 1000); + TD_SMA_LOOPS_CHECK(nLoops, 1000); } if (!isCommit) goto _exit; diff --git a/source/dnode/vnode/src/sma/smaEnv.c b/source/dnode/vnode/src/sma/smaEnv.c index c171355cb2..04a254fc7a 100644 --- a/source/dnode/vnode/src/sma/smaEnv.c +++ b/source/dnode/vnode/src/sma/smaEnv.c @@ -247,8 +247,9 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS _exit: if (code) { smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code)); + } else { + smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType); } - smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType); return code; } @@ -278,7 +279,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) { } else { smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma)); } - tdSmaLoopsCheck(&nLoops, 1000); + TD_SMA_LOOPS_CHECK(nLoops, 1000); } // step 3: diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 72d7895f2a..bcb64235d4 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -247,7 +247,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat tdRSmaQTaskInfoGetFullPath(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir); if (!taosCheckExistFile(taskInfDir)) { char *s = taosStrdup(taskInfDir); - if (taosMulMkDir(taosDirName(s)) != 0) { + if (taosMulMkDir(s) != 0) { terrno = TAOS_SYSTEM_ERROR(errno); taosMemoryFree(s); return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index beb3e24c70..5c701cd683 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -15,8 +15,6 @@ #include "sma.h" -#define TD_QTASKINFO_FNAME_PREFIX "main.tdb" - void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) { tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName); int32_t rsmaLen = strlen(outputName); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 45df342f77..165448fb7b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -3705,15 +3705,15 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret if (level == TSDB_RETENTION_L0) { *pLevel = TSDB_RETENTION_L0; - tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str); + tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { *pLevel = TSDB_RETENTION_L1; - tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str); + tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str); return VND_RSMA1(pVnode); } else { *pLevel = TSDB_RETENTION_L2; - tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str); + tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str); return VND_RSMA2(pVnode); } } diff --git a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim index 75f75072d7..b3144e4e0d 100644 --- a/tests/script/tsim/sma/rsmaCreateInsertQuery.sim +++ b/tests/script/tsim/sma/rsmaCreateInsertQuery.sim @@ -4,9 +4,6 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect -#todo xukaili sma should use rocksdb. -#return 1 - print =============== create database with retentions sql create database d0 retentions 5s:7d,10s:21d,15s:365d; sql use d0 diff --git a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim index 709d2ffda8..0b3938d773 100644 --- a/tests/script/tsim/sma/rsmaPersistenceRecovery.sim +++ b/tests/script/tsim/sma/rsmaPersistenceRecovery.sim @@ -4,8 +4,8 @@ system sh/exec.sh -n dnode1 -s start sleep 50 sql connect -#todo xukaili sma should use rocksdb. -#return 1 +#todo wait for streamState checkpoint +return 1 print =============== create database with retentions sql create database d0 retentions 5s:7d,5m:21d,15m:365d;