chore: rsma refactor

This commit is contained in:
kailixu 2023-07-12 10:13:50 +08:00
parent 8c8bcabdd4
commit f282acbc4c
9 changed files with 19 additions and 25 deletions

View File

@ -187,6 +187,12 @@ typedef enum {
RSMA_EXEC_COMMIT = 3, // triggered by commit
} ERsmaExecType;
#define TD_SMA_LOOPS_CHECK(n, limit) \
if (++(n) > limit) { \
sched_yield(); \
(n) = 0; \
}
// sma
int32_t tdCheckAndInitSmaEnv(SSma *pSma, int8_t smaType);
void tdDestroySmaEnv(SSmaEnv *pSmaEnv);
@ -205,14 +211,6 @@ static FORCE_INLINE void tdUnRefSmaStat(SSma *pSma, SSmaStat *pStat) {
smaDebug("vgId:%d, unref sma stat:%p, val:%d", SMA_VID(pSma), pStat, ref);
}
static FORCE_INLINE void tdSmaLoopsCheck(int32_t *pCnt, int32_t limit) {
++(*pCnt);
if (*pCnt > limit) {
sched_yield();
*pCnt = 0;
}
}
int32_t smaPreClose(SSma *pSma);
// rsma

View File

@ -467,7 +467,7 @@ enum {
SNAP_DATA_DEL = 3,
SNAP_DATA_RSMA1 = 4,
SNAP_DATA_RSMA2 = 5,
SNAP_DATA_QTASK = 6, // obsolete
SNAP_DATA_QTASK = 6,
SNAP_DATA_TQ_HANDLE = 7,
SNAP_DATA_TQ_OFFSET = 8,
SNAP_DATA_STREAM_TASK = 9,

View File

@ -147,7 +147,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
atomic_store_8(RSMA_TRIGGER_STAT(pRSmaStat), TASK_TRIGGER_STAT_PAUSED);
if (isCommit) {
while (atomic_val_compare_exchange_8(RSMA_COMMIT_STAT(pRSmaStat), 0, 1) != 0) {
tdSmaLoopsCheck(&nLoops, 1000);
TD_SMA_LOOPS_CHECK(nLoops, 1000)
}
}
// step 2: wait for all triggered fetch tasks to finish
@ -159,7 +159,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
} else {
smaDebug("vgId:%d, rsma commit%d, fetch tasks are not all finished yet", SMA_VID(pSma), isCommit);
}
tdSmaLoopsCheck(&nLoops, 1000);
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}
/**
@ -171,7 +171,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
(void *)taosGetSelfPthreadId());
nLoops = 0;
while (atomic_load_64(&pRSmaStat->nBufItems) > 0) {
tdSmaLoopsCheck(&nLoops, 1000);
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}
if (!isCommit) goto _exit;

View File

@ -247,8 +247,9 @@ static int32_t tdInitSmaStat(SSmaStat **pSmaStat, int8_t smaType, const SSma *pS
_exit:
if (code) {
smaError("vgId:%d, %s failed at line %d since %s", SMA_VID(pSma), __func__, lino, tstrerror(code));
} else {
smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType);
}
smaDebug("vgId:%d, %s succeed, type:%" PRIi8, SMA_VID(pSma), __func__, smaType);
return code;
}
@ -278,7 +279,7 @@ static void tdDestroyRSmaStat(void *pRSmaStat) {
} else {
smaDebug("vgId:%d, rsma fetch tasks are not all finished yet", SMA_VID(pSma));
}
tdSmaLoopsCheck(&nLoops, 1000);
TD_SMA_LOOPS_CHECK(nLoops, 1000);
}
// step 3:

View File

@ -247,7 +247,7 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
tdRSmaQTaskInfoGetFullPath(TD_VID(pVnode), pRSmaInfo->suid, idx + 1, tfsGetPrimaryPath(pVnode->pTfs), taskInfDir);
if (!taosCheckExistFile(taskInfDir)) {
char *s = taosStrdup(taskInfDir);
if (taosMulMkDir(taosDirName(s)) != 0) {
if (taosMulMkDir(s) != 0) {
terrno = TAOS_SYSTEM_ERROR(errno);
taosMemoryFree(s);
return TSDB_CODE_FAILED;

View File

@ -15,8 +15,6 @@
#include "sma.h"
#define TD_QTASKINFO_FNAME_PREFIX "main.tdb"
void tdRSmaQTaskInfoGetFullPath(int32_t vgId, tb_uid_t suid, int8_t level, const char *path, char *outputName) {
tdRSmaGetDirName(vgId, path, VNODE_RSMA_DIR, true, outputName);
int32_t rsmaLen = strlen(outputName);

View File

@ -3705,15 +3705,15 @@ static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* ret
if (level == TSDB_RETENTION_L0) {
*pLevel = TSDB_RETENTION_L0;
tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L0, str);
return VND_RSMA0(pVnode);
} else if (level == TSDB_RETENTION_L1) {
*pLevel = TSDB_RETENTION_L1;
tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L1, str);
return VND_RSMA1(pVnode);
} else {
*pLevel = TSDB_RETENTION_L2;
tsdbError("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
tsdbDebug("vgId:%d, rsma level %d is selected to query %s", TD_VID(pVnode), TSDB_RETENTION_L2, str);
return VND_RSMA2(pVnode);
}
}

View File

@ -4,9 +4,6 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
#todo xukaili sma should use rocksdb.
#return 1
print =============== create database with retentions
sql create database d0 retentions 5s:7d,10s:21d,15s:365d;
sql use d0

View File

@ -4,8 +4,8 @@ system sh/exec.sh -n dnode1 -s start
sleep 50
sql connect
#todo xukaili sma should use rocksdb.
#return 1
#todo wait for streamState checkpoint
return 1
print =============== create database with retentions
sql create database d0 retentions 5s:7d,5m:21d,15m:365d;