From b691e76782011d2c612497e05b25379f28e9880d Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 13 Jul 2022 05:51:37 +0000 Subject: [PATCH] more vnode snapshot --- source/dnode/vnode/src/inc/tsdb.h | 4 + source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 85 +++++++++++---- source/dnode/vnode/src/tsdb/tsdbUtil.c | 116 +++++++++++++++++++++ 3 files changed, 182 insertions(+), 23 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 819076536d..bfb319dce4 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -126,6 +126,8 @@ void tColDataClear(void *ph); int32_t tColDataAppendValue(SColData *pColData, SColVal *pColVal); int32_t tColDataGetValue(SColData *pColData, int32_t iRow, SColVal *pColVal); int32_t tColDataCopy(SColData *pColDataSrc, SColData *pColDataDest); +int32_t tPutColData(uint8_t *p, SColData *pColData); +int32_t tGetColData(uint8_t *p, SColData *pColData); // SBlockData #define tBlockDataFirstRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, 0) #define tBlockDataLastRow(PBLOCKDATA) tsdbRowFromBlockData(PBLOCKDATA, (PBLOCKDATA)->nRow - 1) @@ -142,6 +144,8 @@ int32_t tBlockDataMerge(SBlockData *pBlockData1, SBlockData *pBlockData2, SBlo int32_t tBlockDataCopy(SBlockData *pBlockDataSrc, SBlockData *pBlockDataDest); SColData *tBlockDataGetColDataByIdx(SBlockData *pBlockData, int32_t idx); void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColData); +int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData); +int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 7dc7e29ad7..8bf1b29454 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -29,7 +29,8 @@ struct STsdbSnapReader { SBlockIdx* pBlockIdx; SMapData mBlock; // SMapData int32_t iBlock; - SBlockData blkData; + SBlockData oBlockData; + SBlockData nBlockData; // for del file int8_t delDone; SDelFReader* pDelFReader; @@ -44,16 +45,11 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { while (true) { if (pReader->pDataFReader == NULL) { - SDFileSet* pSet = NULL; - // taosArraySearch(pTsdb->fs->cState->aDFileSet, &(SDFileSet){.fid = pReader->fid}, tDFileSe) + SDFileSet* pSet = tsdbFSStateGetDFileSet(pTsdb->fs->cState, pReader->fid, TD_GT); - // search the next data file set to read (todo) - if (0 /* TODO */) { - code = TSDB_CODE_VND_READ_END; - goto _exit; - } + if (pSet == NULL) goto _exit; - // open + pReader->fid = pSet->fid; code = tsdbDataFReaderOpen(&pReader->pDataFReader, pReader->pTsdb, pSet); if (code) goto _err; @@ -63,6 +59,8 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->iBlockIdx = 0; pReader->pBlockIdx = NULL; + + tsdbInfo("vgId:%d vnode snapshot tsdb open data file to read, fid:%d", TD_VID(pTsdb->pVnode), pReader->fid); } while (true) { @@ -75,17 +73,15 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { pReader->pBlockIdx = (SBlockIdx*)taosArrayGet(pReader->aBlockIdx, pReader->iBlockIdx); pReader->iBlockIdx++; - // SBlock code = tsdbReadBlock(pReader->pDataFReader, pReader->pBlockIdx, &pReader->mBlock, NULL); if (code) goto _err; pReader->iBlock = 0; } + SBlock block; + SBlock* pBlock = █ while (true) { - SBlock block; - SBlock* pBlock = █ - if (pReader->iBlock >= pReader->mBlock.nItem) { pReader->pBlockIdx = NULL; break; @@ -94,23 +90,63 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { tMapDataGetItemByIdx(&pReader->mBlock, pReader->iBlock, pBlock, tGetBlock); pReader->iBlock++; - if ((pBlock->minVersion >= pReader->sver && pBlock->minVersion <= pReader->ever) || - (pBlock->maxVersion >= pReader->sver && pBlock->maxVersion <= pReader->ever)) { - // overlap (todo) + if (pBlock->minVersion > pReader->ever || pBlock->maxVersion < pReader->sver) continue; - code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->blkData, NULL, NULL); + code = tsdbReadBlockData(pReader->pDataFReader, pReader->pBlockIdx, pBlock, &pReader->oBlockData, NULL, NULL); + if (code) goto _err; + + // filter + tBlockDataReset(&pReader->nBlockData); + for (int32_t iColData = 0; iColData < taosArrayGetSize(pReader->oBlockData.aIdx); iColData++) { + SColData* pColDataO = tBlockDataGetColDataByIdx(&pReader->oBlockData, iColData); + SColData* pColDataN = NULL; + + code = tBlockDataAddColData(&pReader->nBlockData, taosArrayGetSize(pReader->nBlockData.aIdx), &pColDataN); if (code) goto _err; - goto _exit; + tColDataInit(pColDataN, pColDataO->cid, pColDataO->type, pColDataO->smaOn); } + + for (int32_t iRow = 0; iRow < pReader->oBlockData.nRow; iRow++) { + TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow); + int64_t version = TSDBROW_VERSION(&row); + + if (version < pReader->sver || version > pReader->ever) continue; + + code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL); + if (code) goto _err; + } + + // org data (todo) + int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData); + + *ppData = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); + if (*ppData == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto _err; + } + + SSnapDataHdr* pHdr = (SSnapDataHdr*)(*ppData); + pHdr->type = 1; + pHdr->size = size; + + TABLEID* pId = (TABLEID*)(&pHdr[1]); + pId->suid = pReader->pBlockIdx->suid; + pId->uid = pReader->pBlockIdx->uid; + + tPutBlockData((uint8_t*)(&pId[1]), &pReader->nBlockData); + + tsdbInfo("vgId:%d vnode snapshot read data, fid:%d suid:%" PRId64 " uid:%" PRId64 + " iBlock:%d minVersion:%d maxVersion:%d nRow:%d out of %d", + TD_VID(pTsdb->pVnode), pReader->fid, pReader->pBlockIdx->suid, pReader->pBlockIdx->uid, + pReader->iBlock - 1, pBlock->minVersion, pBlock->maxVersion, pReader->nBlockData.nRow, pBlock->nRow); + + goto _exit; } } } _exit: - // if (*ppData) { - // tsdbInfo("vgId:%d "); - // } return code; _err: @@ -221,7 +257,9 @@ int32_t tsdbSnapReaderOpen(STsdb* pTsdb, int64_t sver, int64_t ever, STsdbSnapRe goto _err; } pReader->mBlock = tMapDataInit(); - code = tBlockDataInit(&pReader->blkData); + code = tBlockDataInit(&pReader->oBlockData); + if (code) goto _err; + code = tBlockDataInit(&pReader->nBlockData); if (code) goto _err; pReader->aDelIdx = taosArrayInit(0, sizeof(SDelIdx)); @@ -254,7 +292,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** ppReader) { } taosArrayDestroy(pReader->aBlockIdx); tMapDataClear(&pReader->mBlock); - tBlockDataClear(&pReader->blkData); + tBlockDataClear(&pReader->oBlockData); + tBlockDataClear(&pReader->nBlockData); if (pReader->pDelFReader) { tsdbDelFReaderClose(&pReader->pDelFReader); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index dc6c823368..db408df944 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -921,6 +921,76 @@ _exit: return code; } +int32_t tPutColData(uint8_t *p, SColData *pColData) { + int32_t n = 0; + + n += tPutI16v(p ? p + n : p, pColData->cid); + n += tPutI8(p ? p + n : p, pColData->type); + n += tPutI8(p ? p + n : p, pColData->smaOn); + n += tPutI32v(p ? p + n : p, pColData->nVal); + n += tPutU8(p ? p + n : p, pColData->flag); + + if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit; + if (pColData->flag != HAS_VALUE) { + // bitmap + + int32_t size = BIT2_SIZE(pColData->nVal); + if (p) { + memcpy(p + n, pColData->pBitMap, size); + } + n += size; + } + if (IS_VAR_DATA_TYPE(pColData->type)) { + // offset + + int32_t size = sizeof(int32_t) * pColData->nVal; + if (p) { + memcpy(p + n, pColData->aOffset, size); + } + n += size; + } + n += tPutI32v(p ? p + n : p, pColData->nData); + if (p) { + memcpy(p + n, pColData->pData, pColData->nData); + } + n += pColData->nData; + +_exit: + return n; +} + +int32_t tGetColData(uint8_t *p, SColData *pColData) { + int32_t n = 0; + + n += tGetI16v(p + n, &pColData->cid); + n += tGetI8(p + n, &pColData->type); + n += tGetI8(p + n, &pColData->smaOn); + n += tGetI32v(p + n, &pColData->nVal); + n += tGetU8(p + n, &pColData->flag); + + if (pColData->flag == HAS_NONE || pColData->flag == HAS_NULL) goto _exit; + if (pColData->flag != HAS_VALUE) { + // bitmap + + int32_t size = BIT2_SIZE(pColData->nVal); + pColData->pBitMap = p + n; + n += size; + } + if (IS_VAR_DATA_TYPE(pColData->type)) { + // offset + + int32_t size = sizeof(int32_t) * pColData->nVal; + pColData->aOffset = (int32_t *)(p + n); + n += size; + } + n += tGetI32v(p + n, &pColData->nData); + pColData->pData = p + n; + n += pColData->nData; + +_exit: + return n; +} + static FORCE_INLINE int32_t tColDataCmprFn(const void *p1, const void *p2) { SColData *pColData1 = (SColData *)p1; SColData *pColData2 = (SColData *)p2; @@ -1239,6 +1309,52 @@ void tBlockDataGetColData(SBlockData *pBlockData, int16_t cid, SColData **ppColD *ppColData = NULL; } +int32_t tPutBlockData(uint8_t *p, SBlockData *pBlockData) { + int32_t n = 0; + + n += tPutI32v(p ? p + n : p, pBlockData->nRow); + if (p) { + memcpy(p + n, pBlockData->aVersion, sizeof(int64_t) * pBlockData->nRow); + } + n = n + sizeof(int64_t) * pBlockData->nRow; + if (p) { + memcpy(p + n, pBlockData->aTSKEY, sizeof(TSKEY) * pBlockData->nRow); + } + n = n + sizeof(TSKEY) * pBlockData->nRow; + + int32_t nCol = taosArrayGetSize(pBlockData->aIdx); + n += tPutI32v(p ? p + n : p, nCol); + for (int32_t iCol = 0; iCol < nCol; iCol++) { + SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iCol); + n += tPutColData(p ? p + n : p, pColData); + } + + return n; +} + +int32_t tGetBlockData(uint8_t *p, SBlockData *pBlockData) { + int32_t n = 0; + + tBlockDataReset(pBlockData); + + n += tGetI32v(p + n, &pBlockData->nRow); + pBlockData->aVersion = (int64_t *)(p + n); + n = n + sizeof(int64_t) * pBlockData->nRow; + pBlockData->aTSKEY = (TSKEY *)(p + n); + n = n + sizeof(TSKEY) * pBlockData->nRow; + + int32_t nCol; + n += tGetI32v(p + n, &nCol); + for (int32_t iCol = 0; iCol < nCol; iCol++) { + SColData *pColData; + + if (tBlockDataAddColData(pBlockData, iCol, &pColData)) return -1; + n += tGetColData(p + n, pColData); + } + + return n; +} + // ALGORITHM ============================== void tsdbCalcColDataSMA(SColData *pColData, SColumnDataAgg *pColAgg) { SColVal colVal;