From eedaeda8461e467237e2ba65b188b28fc68e04bc Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Tue, 5 Jul 2022 19:30:37 +0800 Subject: [PATCH] enh: rsma level 1/2 utilize separated version --- source/dnode/vnode/src/inc/sma.h | 8 ++++- source/dnode/vnode/src/inc/vnodeInt.h | 1 + source/dnode/vnode/src/sma/smaRollup.c | 49 +++++++++++++++++++++++--- source/dnode/vnode/src/sma/smaUtil.c | 8 +++++ source/dnode/vnode/src/tsdb/tsdbRead.c | 32 ++++++++++++----- 5 files changed, 84 insertions(+), 14 deletions(-) diff --git a/source/dnode/vnode/src/inc/sma.h b/source/dnode/vnode/src/inc/sma.h index 7f7b3fa885..2d6edae0e7 100644 --- a/source/dnode/vnode/src/inc/sma.h +++ b/source/dnode/vnode/src/inc/sma.h @@ -62,6 +62,7 @@ struct STSmaStat { struct SRSmaStat { SSma *pSma; + int64_t submitVer; int64_t refId; // shared by fetch tasks void *tmrHandle; // shared by fetch tasks int8_t triggerStat; // shared by fetch tasks @@ -84,6 +85,7 @@ struct SSmaStat { #define RSMA_TRIGGER_STAT(r) (&(r)->triggerStat) #define RSMA_RUNNING_STAT(r) (&(r)->runningStat) #define RSMA_REF_ID(r) ((r)->refId) +#define RSMA_SUBMIT_VER(r) ((r)->submitVer) enum { TASK_TRIGGER_STAT_INIT = 0, @@ -208,11 +210,15 @@ struct STFInfo { // specific fields union { struct { - int64_t applyVer[2]; + int64_t submitVer; } qTaskInfo; }; }; +enum { + TD_FTYPE_RSMA_QTASKINFO = 0, +}; + struct STFile { uint8_t state; STFInfo info; diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 4f7a890274..3e2d7c614f 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -172,6 +172,7 @@ int32_t smaPostCommit(SSma* pSma); int32_t tdProcessTSmaCreate(SSma* pSma, int64_t version, const char* msg); int32_t tdProcessTSmaInsert(SSma* pSma, int64_t indexUid, const char* msg); +int64_t tdRSmaGetMaxSubmitVer(SSma* pSma, int8_t level); int32_t tdProcessRSmaCreate(SVnode* pVnode, SVCreateStbReq* pReq); int32_t tdProcessRSmaSubmit(SSma* pSma, void* pMsg, int32_t inputType); diff --git a/source/dnode/vnode/src/sma/smaRollup.c b/source/dnode/vnode/src/sma/smaRollup.c index 02277a966a..ff6915156c 100644 --- a/source/dnode/vnode/src/sma/smaRollup.c +++ b/source/dnode/vnode/src/sma/smaRollup.c @@ -523,6 +523,17 @@ static void tdDestroySDataBlockArray(SArray *pArray) { taosArrayDestroy(pArray); } +int64_t tdRSmaGetMaxSubmitVer(SSma *pSma, int8_t level) { + if (level == TSDB_RETENTION_L0) { + return pSma->pVnode->state.applied; + } + + SSmaEnv *pRSmaEnv = SMA_RSMA_ENV(pSma); + SRSmaStat *pRSmaStat = (SRSmaStat *)(SMA_ENV_STAT(pRSmaEnv)); + + return atomic_load_64(&pRSmaStat->submitVer); +} + static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) { SArray *pResult = NULL; SRSmaInfo *pRSmaInfo = pItem->pRsmaInfo; @@ -562,7 +573,7 @@ static int32_t tdFetchAndSubmitRSmaResult(SRSmaInfoItem *pItem, int8_t blkType) goto _err; } - if (pReq && tdProcessSubmitReq(sinkTsdb, 1, pReq) < 0) { + if (pReq && tdProcessSubmitReq(sinkTsdb, atomic_add_fetch_64(&pRSmaInfo->pStat->submitVer, 1), pReq) < 0) { taosMemoryFreeClear(pReq); goto _err; } @@ -814,6 +825,7 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { } if (!taosCheckExistFile(TD_TFILE_FULL_NAME(&tFile))) { + *committed = 0; if (pVnode->state.committed > 0) { smaWarn("vgId:%d, rsma restore for version %" PRIi64 ", not start as %s not exist", TD_VID(pVnode), pVnode->state.committed, TD_TFILE_FULL_NAME(&tFile)); @@ -828,6 +840,18 @@ static int32_t tdRSmaRestoreQTaskInfoReload(SSma *pSma, int64_t *committed) { goto _err; } + STFInfo tFileInfo = {0}; + if (tdLoadTFileHeader(&tFile, &tFileInfo) < 0) { + goto _err; + } + + ASSERT(tFileInfo.qTaskInfo.submitVer > 0); + + SSmaEnv *pRSmaEnv = pSma->pRSmaEnv; + SRSmaStat *pRSmaStat = (SRSmaStat *)SMA_ENV_STAT(pRSmaEnv); + atomic_store_64(&pRSmaStat->submitVer, tFileInfo.qTaskInfo.submitVer); + smaDebug("%s:%d tFileInfo.qTaskInfo.submitVer = %" PRIi64, __func__, __LINE__, tFileInfo.qTaskInfo.submitVer); + SRSmaQTaskInfoIter fIter = {0}; if (tdRSmaQTaskInfoIterInit(&fIter, &tFile) < 0) { tdRSmaQTaskInfoIterDestroy(&fIter); @@ -1094,6 +1118,22 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { } STFile tFile = {0}; + if (RSMA_SUBMIT_VER(pRSmaStat) > 0) { + char qTaskInfoFName[TSDB_FILENAME_LEN]; + tdRSmaQTaskInfoGetFName(vid, pSma->pVnode->state.applied, qTaskInfoFName); + if (tdInitTFile(&tFile, tfsGetPrimaryPath(pVnode->pTfs), qTaskInfoFName) < 0) { + smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); + goto _err; + } + if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { + smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); + goto _err; + } + smaDebug("vgId:%d, rsma, serialize qTaskInfo, file %s created", vid, TD_TFILE_FULL_NAME(&tFile)); + + isFileCreated = true; + } + while (infoHash) { SRSmaInfo *pRSmaInfo = *(SRSmaInfo **)infoHash; for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) { @@ -1129,12 +1169,12 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { smaError("vgId:%d, rsma persit, init %s failed since %s", vid, qTaskInfoFName, terrstr()); goto _err; } - if (tdCreateTFile(&tFile, true, -1) < 0) { + if (tdCreateTFile(&tFile, true, TD_FTYPE_RSMA_QTASKINFO) < 0) { smaError("vgId:%d, rsma persit, create %s failed since %s", vid, TD_TFILE_FULL_NAME(&tFile), terrstr()); goto _err; } - smaDebug("vgId:%d, rsma, table %" PRIi64 " level %d serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, - i + 1, TD_TFILE_FULL_NAME(&tFile)); + smaDebug("vgId:%d, rsma, table %" PRIi64 " serialize qTaskInfo, file %s created", vid, pRSmaInfo->suid, + TD_TFILE_FULL_NAME(&tFile)); isFileCreated = true; } @@ -1161,6 +1201,7 @@ int32_t tdRSmaPersistExecImpl(SRSmaStat *pRSmaStat) { } if (isFileCreated) { + tFile.info.qTaskInfo.submitVer = atomic_load_64(&pRSmaStat->submitVer); if (tdUpdateTFileHeader(&tFile) < 0) { smaError("vgId:%d, rsma, failed to update tfile %s header since %s", vid, TD_TFILE_FULL_NAME(&tFile), tstrerror(terrno)); diff --git a/source/dnode/vnode/src/sma/smaUtil.c b/source/dnode/vnode/src/sma/smaUtil.c index 14caf4144e..2ce6d23648 100644 --- a/source/dnode/vnode/src/sma/smaUtil.c +++ b/source/dnode/vnode/src/sma/smaUtil.c @@ -32,6 +32,9 @@ static int32_t tdEncodeTFInfo(void **buf, STFInfo *pInfo) { tlen += taosEncodeFixedU32(buf, pInfo->ftype); tlen += taosEncodeFixedU32(buf, pInfo->fver); tlen += taosEncodeFixedI64(buf, pInfo->fsize); + if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) { + tlen += taosEncodeFixedI64(buf, pInfo->qTaskInfo.submitVer); + } return tlen; } @@ -41,6 +44,11 @@ static void *tdDecodeTFInfo(void *buf, STFInfo *pInfo) { buf = taosDecodeFixedU32(buf, &(pInfo->ftype)); buf = taosDecodeFixedU32(buf, &(pInfo->fver)); buf = taosDecodeFixedI64(buf, &(pInfo->fsize)); + // specific + if (pInfo->ftype == TD_FTYPE_RSMA_QTASKINFO) { + buf = taosDecodeFixedI64(buf, &(pInfo->qTaskInfo.submitVer)); + } + return buf; } diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index e7ae39f8fe..9c09f50584 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -154,7 +154,8 @@ static void doMergeMultiRows(TSDBROW* pRow, uint64_t uid, SIterInfo *pIter, SArr static void doMergeMemIMemRows(TSDBROW* pRow, TSDBROW* piRow, STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STSRow** pTSRow); static int32_t initDelSkylineIterator(STableBlockScanInfo* pBlockScanInfo, STsdbReader* pReader, STbData* pMemTbData, STbData* piMemTbData); -static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr); +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idstr, int8_t *pLevel); +static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level); static int32_t setColumnIdSlotList(STsdbReader* pReader, SSDataBlock* pBlock) { SBlockLoadSuppInfo* pSupInfo = &pReader->suppInfo; @@ -366,6 +367,7 @@ static SSDataBlock* createResBlock(SQueryTableDataCond* pCond, int32_t capacity) static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsdbReader** ppReader, const char* idstr) { int32_t code = 0; + int8_t level = 0; STsdbReader* pReader = (STsdbReader*)taosMemoryCalloc(1, sizeof(*pReader)); if (pReader == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -374,13 +376,13 @@ static int32_t tsdbReaderCreate(SVnode* pVnode, SQueryTableDataCond* pCond, STsd initReaderStatus(&pReader->status); - pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr); + pReader->pTsdb = getTsdbByRetentions(pVnode, pCond->twindows[0].skey, pVnode->config.tsdbCfg.retentions, idstr, &level); pReader->suid = pCond->suid; pReader->order = pCond->order; pReader->capacity = 4096; pReader->idStr = (idstr != NULL)? strdup(idstr):NULL; - pReader->verRange = (SVersionRange) {.minVer = pCond->startVersion, .maxVer = 10000}; - pReader->type = pCond->type; + pReader->verRange = getQueryVerRange(pVnode, pCond, level); + pReader->type = pCond->type; pReader->window = updateQueryTimeWindow(pVnode->pTsdb, pCond->twindows); // todo remove this @@ -2379,12 +2381,13 @@ static int32_t buildBlockFromFiles(STsdbReader* pReader) { } } -STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr) { +static STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions, const char* idStr, + int8_t* pLevel) { if (VND_IS_RSMA(pVnode)) { - int level = 0; + int8_t level = 0; int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision); - for (int i = 0; i < TSDB_RETENTION_MAX; ++i) { + for (int8_t i = 0; i < TSDB_RETENTION_MAX; ++i) { SRetention* pRetention = retentions + level; if (pRetention->keep <= 0) { if (level > 0) { @@ -2398,16 +2401,19 @@ STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions ++level; } - int32_t vgId = TD_VID(pVnode); - const char* str = (idStr != NULL)? idStr:""; + int32_t vgId = TD_VID(pVnode); + const char* str = (idStr != NULL) ? idStr : ""; if (level == TSDB_RETENTION_L0) { + *pLevel = TSDB_RETENTION_L0; tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L0, str); return VND_RSMA0(pVnode); } else if (level == TSDB_RETENTION_L1) { + *pLevel = TSDB_RETENTION_L1; tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L1, str); return VND_RSMA1(pVnode); } else { + *pLevel = TSDB_RETENTION_L2; tsdbDebug("vgId:%d, read handle %p rsma level %d is selected to query %s", vgId, TSDB_RETENTION_L2, str); return VND_RSMA2(pVnode); } @@ -2416,6 +2422,14 @@ STsdb* getTsdbByRetentions(SVnode* pVnode, TSKEY winSKey, SRetention* retentions return VND_TSDB(pVnode); } +static SVersionRange getQueryVerRange(SVnode* pVnode, SQueryTableDataCond* pCond, int8_t level) { + if (VND_IS_RSMA(pVnode)) { + return (SVersionRange){.minVer = pCond->startVersion, .maxVer = tdRSmaGetMaxSubmitVer(pVnode->pSma, level)}; + } + + return (SVersionRange){.minVer = pCond->startVersion, .maxVer = pVnode->state.applied}; +} + // // todo not unref yet, since it is not support multi-group interpolation query // static UNUSED_FUNC void changeQueryHandleForInterpQuery(STsdbReader* pHandle) { // // filter the queried time stamp in the first place