enh: rsma retetion and stream state

This commit is contained in:
kailixu 2023-10-31 20:26:42 +08:00
parent 8cedf1f922
commit e6684fa5de
7 changed files with 45 additions and 14 deletions

View File

@ -590,7 +590,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, CFG_SCOPE_SERVER) != 0) return -1;
tsNumOfVnodeRsmaThreads = tsNumOfCores;
tsNumOfVnodeRsmaThreads = tsNumOfCores / 4;
tsNumOfVnodeRsmaThreads = TMAX(tsNumOfVnodeRsmaThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeRsmaThreads", tsNumOfVnodeRsmaThreads, 1, 1024, CFG_SCOPE_SERVER) != 0) return -1;

View File

@ -143,6 +143,7 @@ struct SRSmaInfoItem {
int32_t maxDelay; // ms
tmr_h tmrId;
void *pStreamState;
void *pStreamTask; // SStreamTask
};
struct SRSmaInfo {
@ -218,7 +219,7 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree);
int32_t tdRSmaRestore(SSma *pSma, int8_t type, int64_t committedVer, int8_t rollback);
int32_t tdRSmaProcessCreateImpl(SSma *pSma, SRSmaParam *param, int64_t suid, const char *tbName);
int32_t tdRSmaProcessExecImpl(SSma *pSma, ERsmaExecType type);
// int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash);
int32_t tdRSmaProcessRestoreImpl(SSma *pSma, int8_t type, int64_t qtaskFileVer, int8_t rollback);
void tdRSmaQTaskInfoGetFullPath(SVnode *pVnode, tb_uid_t suid, int8_t level, STfs *pTfs, char *outputName);

View File

@ -209,6 +209,7 @@ int32_t tsdbBegin(STsdb* pTsdb);
// int32_t tsdbCommit(STsdb* pTsdb, SCommitInfo* pInfo);
int32_t tsdbCacheCommit(STsdb* pTsdb);
int32_t tsdbCompact(STsdb* pTsdb, SCompactInfo* pInfo);
int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync);
// int32_t tsdbFinishCommit(STsdb* pTsdb);
// int32_t tsdbRollbackCommit(STsdb* pTsdb);
int tsdbScanAndConvertSubmitMsg(STsdb* pTsdb, SSubmitReq2* pMsg);
@ -274,7 +275,7 @@ int32_t smaPrepareAsyncCommit(SSma* pSma);
int32_t smaCommit(SSma* pSma, SCommitInfo* pInfo);
int32_t smaFinishCommit(SSma* pSma);
int32_t smaPostCommit(SSma* pSma);
int32_t smaDoRetention(SSma* pSma, int64_t now);
int32_t smaRetention(SSma* pSma, int64_t now);
int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg);
int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg);

View File

@ -178,7 +178,7 @@ static int32_t tdProcessRSmaAsyncPreCommitImpl(SSma *pSma, bool isCommit) {
if (!isCommit) goto _exit;
// code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
code = tdRSmaPersistExecImpl(pRSmaStat, RSMA_INFO_HASH(pRSmaStat));
TSDB_CHECK_CODE(code, lino, _exit);
smaInfo("vgId:%d, rsma commit, operator state committed, TID:%p", SMA_VID(pSma), (void *)taosGetSelfPthreadId());

View File

@ -90,6 +90,10 @@ void *tdFreeRSmaInfo(SSma *pSma, SRSmaInfo *pInfo, bool isDeepFree) {
streamStateClose(pItem->pStreamState, false);
}
if(isDeepFree && pItem->pStreamTask) {
taosMemoryFreeClear(pItem->pStreamTask);
}
if (isDeepFree && pInfo->taskInfo[i]) {
tdRSmaQTaskInfoFree(&pInfo->taskInfo[i], SMA_VID(pSma), i + 1);
}
@ -254,11 +258,19 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
taosMemoryFree(s);
}
SStreamTask task = {.id.taskId = 0, .id.streamId = 0}; // TODO: assign value
task.pMeta = pVnode->pTq->pStreamMeta;
pStreamState = streamStateOpen(taskInfDir, &task, true, -1, -1);
SStreamTask *pStreamTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (!pStreamTask) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return TSDB_CODE_FAILED;
}
pStreamTask->id.taskId = 0;
pStreamTask->id.streamId = pRSmaInfo->suid + idx;
pStreamTask->pMeta = pVnode->pTq->pStreamMeta;
pStreamState = streamStateOpen(taskInfDir, pStreamTask, true, -1, -1);
if (!pStreamState) {
terrno = TSDB_CODE_RSMA_STREAM_STATE_OPEN;
taosMemoryFreeClear(pStreamTask);
return TSDB_CODE_FAILED;
}
@ -268,11 +280,13 @@ static int32_t tdSetRSmaInfoItemParams(SSma *pSma, SRSmaParam *param, SRSmaStat
pRSmaInfo->taskInfo[idx] = qCreateStreamExecTaskInfo(param->qmsg[idx], &handle, TD_VID(pVnode), 0);
if (!pRSmaInfo->taskInfo[idx]) {
terrno = TSDB_CODE_RSMA_QTASKINFO_CREATE;
taosMemoryFreeClear(pStreamTask);
return TSDB_CODE_FAILED;
}
SRSmaInfoItem *pItem = &(pRSmaInfo->items[idx]);
pItem->triggerStat = TASK_TRIGGER_STAT_ACTIVE; // fetch the data when reboot
pItem->pStreamState = pStreamState;
pItem->pStreamTask = pStreamTask;
if (param->maxdelay[idx] < TSDB_MIN_ROLLUP_MAX_DELAY) {
int64_t msInterval =
convertTimeFromPrecisionToUnit(pRetention[idx + 1].freq, pTsdbCfg->precision, TIME_UNIT_MILLISECOND);
@ -562,7 +576,7 @@ static int32_t tdFetchSubmitReqSuids(SSubmitReq2 *pMsg, STbUidStore *pStore) {
* @param now
* @return int32_t
*/
int32_t smaDoRetention(SSma *pSma, int64_t now) {
int32_t smaRetention(SSma *pSma, int64_t now) {
int32_t code = TSDB_CODE_SUCCESS;
if (!VND_IS_RSMA(pSma->pVnode)) {
return code;
@ -570,8 +584,8 @@ int32_t smaDoRetention(SSma *pSma, int64_t now) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
// code = tsdbDoRetention(pSma->pRSmaTsdb[i], now);
// if (code) goto _end;
code = tsdbRetention(pSma->pRSmaTsdb[i], now, pSma->pVnode->config.sttTrigger == 1);
if (code) goto _end;
}
}
@ -1050,7 +1064,7 @@ _err:
return code;
}
#if 0
#if 1
int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
int32_t code = 0;
int32_t lino = 0;
@ -1072,6 +1086,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
SRSmaInfoItem *pItem = RSMA_INFO_ITEM(pRSmaInfo, i);
#if 0
if (pItem && pItem->pStreamState) {
if (streamStateCommit(pItem->pStreamState) < 0) {
code = TSDB_CODE_RSMA_STREAM_STATE_COMMIT;
@ -1080,6 +1095,11 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat, SHashObj *pInfoHash) {
smaDebug("vgId:%d, rsma persist, stream state commit success, table %" PRIi64 ", level %d", TD_VID(pVnode),
pRSmaInfo->suid, i + 1);
}
#endif
if(pItem && pItem->pStreamState) {
}
}
}

View File

@ -30,8 +30,13 @@ void tdRSmaGetDirName(SVnode *pVnode, STfs *pTfs, bool endWithSep, char *outputN
offset = strlen(outputName);
// rsma
#if 0
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s", TD_DIRSEP, VNODE_RSMA_DIR,
(endWithSep ? TD_DIRSEP : ""));
#else
snprintf(outputName + offset, TSDB_FILENAME_LEN - offset - 1, "%s%s%s%s%s%s%s", TD_DIRSEP, "tq", TD_DIRSEP, "stream",
TD_DIRSEP, "state", (endWithSep ? TD_DIRSEP : ""));
#endif
}
// smaXXXUtil ================

View File

@ -15,8 +15,12 @@
#include "vnd.h"
extern int32_t tsdbRetention(STsdb *tsdb, int64_t now, int32_t sync);
int32_t vnodeDoRetention(SVnode *pVnode, int64_t now) {
return tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
int32_t code = TSDB_CODE_SUCCESS;
code = tsdbRetention(pVnode->pTsdb, now, pVnode->config.sttTrigger == 1);
if (TSDB_CODE_SUCCESS == code) code = smaRetention(pVnode->pSma, now);
return code;
}