From e6684fa5de9fd95ed83359a3d8965f5b74cfd987 Mon Sep 17 00:00:00 2001 From: kailixu Date: Tue, 31 Oct 2023 20:26:42 +0800 Subject: [PATCH] enh: rsma retetion and stream state --- source/common/src/tglobal.c | 2 +- source/dnode/vnode/src/inc/sma.h | 3 +- source/dnode/vnode/src/inc/vnodeInt.h | 3 +- source/dnode/vnode/src/sma/smaCommit.c | 2 +- source/dnode/vnode/src/sma/smaRollup.c | 34 ++++++++++++++++----- source/dnode/vnode/src/sma/smaUtil.c | 5 +++ source/dnode/vnode/src/vnd/vnodeRetention.c | 10 ++++-- 7 files changed, 45 insertions(+), 14 deletions(-) diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index c6cff27011..cc485b16dc 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -590,7 +590,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1; - tsNumOfVnodeRsmaThreads = tsNumOfCores; + tsNumOfVnodeRsmaThreads = tsNumOfCores / 4; tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4); if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1; diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index aaf0973b41..5dd7df0962 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -143,6 +143,7 @@ struct SRSmaInfoItem { int32_t maxDelay; // ms tmr_h tmrId; void *pStreamState; + void *pStreamTask; // SStreamTask }; struct SRSmaInfo { @@ -218,7 +219,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree); int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback); int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName); int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type); -// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); +int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash); int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback); void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 12e273c32d..c7343b1b42 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -209,6 +209,7 @@ int32_t tsdbBegin(STsdb* pTsdb); // int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo); int32_t tsdbCacheCommit(STsdb* pTsdb); int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo); +int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync); // int32_t tsdbFinishCommit(STsdb* pTsdb); // int32_t tsdbRollbackCommit(STsdb* pTsdb); int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg); @@ -274,7 +275,7 @@ int32_t smaPrepareAsyncCommit(SSma* pSma); int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo); int32_t smaFinishCommit(SSma* pSma); int32_t smaPostCommit(SSma* pSma); -int32_t smaDoRetention(SSma* pSma, int64_t now); +int32_t smaRetention(SSma* pSma, int64_t now); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); diff --git a/source/dnode/vnode/src/sma/smaCommit.c b/source/dnode/vnode/src/sma/smaCommit.c index c26157f4b7..652aab3c01 100644 --- a/source/dnode/vnode/src/sma/smaCommit.c +++ b/source/dnode/vnode/src/sma/smaCommit.c @@ -178,7 +178,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) { if (!isCommit) goto _exit; - // code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); + code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat)); TSDB_CHECK_CODE(code, lino, _exit); smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId()); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 14c5baa402..4ea7d4612a 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -90,6 +90,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) { streamStateClose(pItem->pStreamState, false); } + if(isDeepFree && pItem->pStreamTask) { + taosMemoryFreeClear(pItem->pStreamTask); + } + if (isDeepFree && pInfo->taskInfo[i]) { tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1); } @@ -254,11 +258,19 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat taosMemoryFree(s); } - SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value - task.pMeta = pVnode->pTq->pStreamMeta; - pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1); + SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(SStreamTask)); + if (!pStreamTask) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return TSDB_CODE_FAILED; + } + pStreamTask->id.taskId = 0; + pStreamTask->id.streamId = pRSmaInfo->suid + idx; + pStreamTask->pMeta = pVnode->pTq->pStreamMeta; + + pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1); if (!pStreamState) { terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN; + taosMemoryFreeClear(pStreamTask); return TSDB_CODE_FAILED; } @@ -268,11 +280,13 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0); if (!pRSmaInfo->taskInfo[idx]) { terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE; + taosMemoryFreeClear(pStreamTask); return TSDB_CODE_FAILED; } SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]); pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot pItem->pStreamState = pStreamState; + pItem->pStreamTask = pStreamTask; if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) { int64_t msInterval = convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND); @@ -562,7 +576,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) { * @param now * @return int32_t */ -int32_t smaDoRetention(SSma *pSma, int64_t now) { +int32_t smaRetention(SSma *pSma, int64_t now) { int32_t code = TSDB_CODE_SUCCESS; if (!VND_IS_RSMA(pSma->pVnode)) { return code; @@ -570,8 +584,8 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { if (pSma->pRSmaTsdb[i]) { - // code = tsdbDoRetention(pSma->pRSmaTsdb[i], now); - // if (code) goto _end; + code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1); + if (code) goto _end; } } @@ -1050,7 +1064,7 @@ _err: return code; } -#if 0 +#if 1 int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { int32_t code = 0; int32_t lino = 0; @@ -1072,6 +1086,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i); +#if 0 if (pItem && pItem->pStreamState) { if (streamStateCommit(pItem->pStreamState) < 0) { code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT; @@ -1080,6 +1095,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) { smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode), pRSmaInfo->suid, i + 1); } +#endif + if(pItem && pItem->pStreamState) { + + } + } } diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index e45cbac329..479c57e65f 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -30,8 +30,13 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN offset = strlen(outputName); // rsma +#if 0 snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR, (endWithSep ? TD_DIRSEP : "")); +#else + snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s%s%s%s", TD_DIRSEP, "tq", TD_DIRSEP, "stream", + TD_DIRSEP, "state", (endWithSep ? TD_DIRSEP : "")); +#endif } // smaXXXUtil ================ diff --git a/source/dnode/vnode/src/vnd/vnodeRetention.c b/source/dnode/vnode/src/vnd/vnodeRetention.c index f3344d1d7d..c510c0fe92 100644 --- a/source/dnode/vnode/src/vnd/vnodeRetention.c +++ b/source/dnode/vnode/src/vnd/vnodeRetention.c @@ -15,8 +15,12 @@ #include "vnd.h" -extern int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync); - int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) { - return tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1); + int32_t code = TSDB_CODE_SUCCESS; + + code = tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1); + + if (TSDB_CODE_SUCCESS == code) code = smaRetention(pVnode->pSma, now); + + return code; } \ No newline at end of file