From c941ad58ee467a845acbc408e567fcc5fc92501c Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Wed, 6 Mar 2024 17:06:39 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/inc/tsdb.h | 3 + source/dnode/vnode/src/tsdb/tsdbUtil.c | 426 +++++++++++++------------ 2 files changed, 217 insertions(+), 212 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index e5456844d9..6c77222223 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -177,6 +177,9 @@ int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS void tBlockDataClear(SBlockData *pBlockData); int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist); int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist); +int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, SBuffer *assist); +int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br, + SBlockData *blockData, SBuffer *assist); SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid); int32_t tBlockDataAddColData(SBlockData *pBlockData, int16_t cid, int8_t type, int8_t cflag, SColData **ppColData); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index ac87c6a41e..baff960955 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -16,6 +16,8 @@ #include "tdataformat.h" #include "tsdb.h" +static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist); + // SMapData ======================================================================= void tMapDataReset(SMapData *pMapData) { pMapData->nItem = 0; @@ -126,15 +128,15 @@ _exit: int32_t tPutMapData(uint8_t *p, SMapData *pMapData) { int32_t n = 0; - n += tPutI32v(p ? p + n : p, pMapData->nItem, true); + n += tPutI32v(p ? p + n : p, pMapData->nItem); if (pMapData->nItem) { int32_t lOffset = 0; for (int32_t iItem = 0; iItem < pMapData->nItem; iItem++) { - n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset, true); + n += tPutI32v(p ? p + n : p, pMapData->aOffset[iItem] - lOffset); lOffset = pMapData->aOffset[iItem]; } - n += tPutI32v(p ? p + n : p, pMapData->nData, true); + n += tPutI32v(p ? p + n : p, pMapData->nData); if (p) { memcpy(p + n, pMapData->pData, pMapData->nData); } @@ -197,10 +199,10 @@ int32_t tPutBlockIdx(uint8_t *p, void *ph) { int32_t n = 0; SBlockIdx *pBlockIdx = (SBlockIdx *)ph; - n += tPutI64(p ? p + n : p, pBlockIdx->suid, true); - n += tPutI64(p ? p + n : p, pBlockIdx->uid, true); - n += tPutI64v(p ? p + n : p, pBlockIdx->offset, true); - n += tPutI64v(p ? p + n : p, pBlockIdx->size, true); + n += tPutI64(p ? p + n : p, pBlockIdx->suid); + n += tPutI64(p ? p + n : p, pBlockIdx->uid); + n += tPutI64v(p ? p + n : p, pBlockIdx->offset); + n += tPutI64v(p ? p + n : p, pBlockIdx->size); return n; } @@ -266,23 +268,23 @@ int32_t tPutDataBlk(uint8_t *p, void *ph) { int32_t n = 0; SDataBlk *pDataBlk = (SDataBlk *)ph; - n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version, true); - n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts, true); - n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version, true); - n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts, true); - n += tPutI64v(p ? p + n : p, pDataBlk->minVer, true); - n += tPutI64v(p ? p + n : p, pDataBlk->maxVer, true); - n += tPutI32v(p ? p + n : p, pDataBlk->nRow, true); - n += tPutI8(p ? p + n : p, pDataBlk->hasDup, true); - n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock, true); + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->minKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.version); + n += tPutI64v(p ? p + n : p, pDataBlk->maxKey.ts); + n += tPutI64v(p ? p + n : p, pDataBlk->minVer); + n += tPutI64v(p ? p + n : p, pDataBlk->maxVer); + n += tPutI32v(p ? p + n : p, pDataBlk->nRow); + n += tPutI8(p ? p + n : p, pDataBlk->hasDup); + n += tPutI8(p ? p + n : p, pDataBlk->nSubBlock); for (int8_t iSubBlock = 0; iSubBlock < pDataBlk->nSubBlock; iSubBlock++) { - n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset, true); - n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock, true); - n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey, true); + n += tPutI64v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].offset); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szBlock); + n += tPutI32v(p ? p + n : p, pDataBlk->aSubBlock[iSubBlock].szKey); } if (pDataBlk->nSubBlock == 1 && !pDataBlk->hasDup) { - n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset, true); - n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size, true); + n += tPutI64v(p ? p + n : p, pDataBlk->smaInfo.offset); + n += tPutI32v(p ? p + n : p, pDataBlk->smaInfo.size); } return n; @@ -344,17 +346,17 @@ int32_t tPutSttBlk(uint8_t *p, void *ph) { int32_t n = 0; SSttBlk *pSttBlk = (SSttBlk *)ph; - n += tPutI64(p ? p + n : p, pSttBlk->suid, true); - n += tPutI64(p ? p + n : p, pSttBlk->minUid, true); - n += tPutI64(p ? p + n : p, pSttBlk->maxUid, true); - n += tPutI64v(p ? p + n : p, pSttBlk->minKey, true); - n += tPutI64v(p ? p + n : p, pSttBlk->maxKey, true); - n += tPutI64v(p ? p + n : p, pSttBlk->minVer, true); - n += tPutI64v(p ? p + n : p, pSttBlk->maxVer, true); - n += tPutI32v(p ? p + n : p, pSttBlk->nRow, true); - n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset, true); - n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock, true); - n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey, true); + n += tPutI64(p ? p + n : p, pSttBlk->suid); + n += tPutI64(p ? p + n : p, pSttBlk->minUid); + n += tPutI64(p ? p + n : p, pSttBlk->maxUid); + n += tPutI64v(p ? p + n : p, pSttBlk->minKey); + n += tPutI64v(p ? p + n : p, pSttBlk->maxKey); + n += tPutI64v(p ? p + n : p, pSttBlk->minVer); + n += tPutI64v(p ? p + n : p, pSttBlk->maxVer); + n += tPutI32v(p ? p + n : p, pSttBlk->nRow); + n += tPutI64v(p ? p + n : p, pSttBlk->bInfo.offset); + n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szBlock); + n += tPutI32v(p ? p + n : p, pSttBlk->bInfo.szKey); return n; } @@ -1378,77 +1380,6 @@ SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) { return NULL; } -static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist) { - int32_t code = 0; - int32_t lino = 0; - SCompressInfo cinfo; - - // uid - if (bData->uid == 0) { - cinfo = (SCompressInfo){ - .cmprAlg = hdr->cmprAlg, - .dataType = TSDB_DATA_TYPE_BIGINT, - .originalSize = sizeof(int64_t) * bData->nRow, - }; - code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); - hdr->szUid = cinfo.compressedSize; - } - - // version - cinfo = (SCompressInfo){ - .cmprAlg = hdr->cmprAlg, - .dataType = TSDB_DATA_TYPE_BIGINT, - .originalSize = sizeof(int64_t) * bData->nRow, - }; - code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); - hdr->szVer = cinfo.compressedSize; - - // ts - cinfo = (SCompressInfo){ - .cmprAlg = hdr->cmprAlg, - .dataType = TSDB_DATA_TYPE_TIMESTAMP, - .originalSize = sizeof(TSKEY) * bData->nRow, - }; - code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); - hdr->szKey = cinfo.compressedSize; - - // primary keys - for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) { - ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS); - - SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs]; - SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); - - if ((colData->cflag & COL_IS_KEY) == 0) { - break; - } - - SColDataCompressInfo info = { - .cmprAlg = hdr->cmprAlg, - }; - code = tColDataCompress(colData, &info, buffer, assist); - TSDB_CHECK_CODE(code, lino, _exit); - - *blockCol = (SBlockCol){ - .cid = info.columnId, - .type = info.dataType, - .cflag = info.columnFlag, - .flag = info.flag, - .szOrigin = info.dataOriginalSize, - .szBitmap = info.bitmapCompressedSize, - .szOffset = info.offsetCompressedSize, - .szValue = info.dataCompressedSize, - .offset = 0, - }; - } - -_exit: - return code; -} - /* buffers[0]: SDiskDataHdr * buffers[1]: key part: uid + version + ts + primary keys * buffers[2]: SBlockCol part @@ -1523,115 +1454,6 @@ _exit: return code; } -int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br, - SBlockData *blockData, SBuffer *assist) { - int32_t code = 0; - int32_t lino = 0; - - SColData *colData; - - code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData); - TSDB_CHECK_CODE(code, lino, _exit); - - ASSERT(blockCol->flag != HAS_NONE); - - SColDataCompressInfo info = { - .cmprAlg = hdr->cmprAlg, - .columnFlag = blockCol->cflag, - .flag = blockCol->flag, - .dataType = blockCol->type, - .columnId = blockCol->cid, - .numOfData = hdr->nRow, - .bitmapOriginalSize = 0, - .bitmapCompressedSize = blockCol->szBitmap, - .offsetOriginalSize = sizeof(int32_t) * hdr->nRow, - .offsetCompressedSize = blockCol->szOffset, - .dataOriginalSize = blockCol->szOrigin, - .dataCompressedSize = blockCol->szValue, - }; - - switch (blockCol->flag) { - case (HAS_NONE | HAS_NULL | HAS_VALUE): - info.bitmapOriginalSize = BIT2_SIZE(hdr->nRow); - break; - case (HAS_NONE | HAS_NULL): - case (HAS_NONE | HAS_VALUE): - case (HAS_NULL | HAS_VALUE): - info.bitmapOriginalSize = BIT1_SIZE(hdr->nRow); - break; - } - - code = tColDataDecompress(BR_PTR(br), &info, colData, assist); - TSDB_CHECK_CODE(code, lino, _exit); - br->offset += blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; - -_exit: - return code; -} - -int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, - SBuffer *assist) { - int32_t code = 0; - int32_t lino = 0; - SCompressInfo cinfo; - - // uid - if (hdr->szUid > 0) { - cinfo = (SCompressInfo){ - .cmprAlg = hdr->cmprAlg, - .dataType = TSDB_DATA_TYPE_BIGINT, - .compressedSize = hdr->szUid, - .originalSize = sizeof(int64_t) * hdr->nRow, - }; - - code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize); - TSDB_CHECK_CODE(code, lino, _exit); - code = tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist); - TSDB_CHECK_CODE(code, lino, _exit); - br->offset += cinfo.compressedSize; - } - - // version - cinfo = (SCompressInfo){ - .cmprAlg = hdr->cmprAlg, - .dataType = TSDB_DATA_TYPE_BIGINT, - .compressedSize = hdr->szVer, - .originalSize = sizeof(int64_t) * hdr->nRow, - }; - code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize); - TSDB_CHECK_CODE(code, lino, _exit); - code = tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist); - TSDB_CHECK_CODE(code, lino, _exit); - br->offset += cinfo.compressedSize; - - // ts - cinfo = (SCompressInfo){ - .cmprAlg = hdr->cmprAlg, - .dataType = TSDB_DATA_TYPE_TIMESTAMP, - .compressedSize = hdr->szKey, - .originalSize = sizeof(TSKEY) * hdr->nRow, - }; - code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize); - TSDB_CHECK_CODE(code, lino, _exit); - code = tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist); - TSDB_CHECK_CODE(code, lino, _exit); - br->offset += cinfo.compressedSize; - - // primary keys - for (int i = 0; i < hdr->numOfPKs; i++) { - const SBlockCol *blockCol = &hdr->primaryBlockCols[i]; - - ASSERT(blockCol->flag == HAS_VALUE); - ASSERT(blockCol->cflag & COL_IS_KEY); - - code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist); - TSDB_CHECK_CODE(code, lino, _exit); - } - -_exit: - return code; -} - int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist) { int32_t code = 0; int32_t lino = 0; @@ -1740,3 +1562,183 @@ int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) { return 0; } + +static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + SCompressInfo cinfo; + + // uid + if (bData->uid == 0) { + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .originalSize = sizeof(int64_t) * bData->nRow, + }; + code = tCompressDataToBuffer(bData->aUid, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + hdr->szUid = cinfo.compressedSize; + } + + // version + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .originalSize = sizeof(int64_t) * bData->nRow, + }; + code = tCompressDataToBuffer((uint8_t *)bData->aVersion, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + hdr->szVer = cinfo.compressedSize; + + // ts + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_TIMESTAMP, + .originalSize = sizeof(TSKEY) * bData->nRow, + }; + code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + hdr->szKey = cinfo.compressedSize; + + // primary keys + for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) { + ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS); + + SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs]; + SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); + + if ((colData->cflag & COL_IS_KEY) == 0) { + break; + } + + SColDataCompressInfo info = { + .cmprAlg = hdr->cmprAlg, + }; + code = tColDataCompress(colData, &info, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + + *blockCol = (SBlockCol){ + .cid = info.columnId, + .type = info.dataType, + .cflag = info.columnFlag, + .flag = info.flag, + .szOrigin = info.dataOriginalSize, + .szBitmap = info.bitmapCompressedSize, + .szOffset = info.offsetCompressedSize, + .szValue = info.dataCompressedSize, + .offset = 0, + }; + } + +_exit: + return code; +} + +int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br, + SBlockData *blockData, SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + + SColData *colData; + + code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(blockCol->flag != HAS_NONE); + + SColDataCompressInfo info = { + .cmprAlg = hdr->cmprAlg, + .columnFlag = blockCol->cflag, + .flag = blockCol->flag, + .dataType = blockCol->type, + .columnId = blockCol->cid, + .numOfData = hdr->nRow, + .bitmapOriginalSize = 0, + .bitmapCompressedSize = blockCol->szBitmap, + .offsetOriginalSize = sizeof(int32_t) * hdr->nRow, + .offsetCompressedSize = blockCol->szOffset, + .dataOriginalSize = blockCol->szOrigin, + .dataCompressedSize = blockCol->szValue, + }; + + switch (blockCol->flag) { + case (HAS_NONE | HAS_NULL | HAS_VALUE): + info.bitmapOriginalSize = BIT2_SIZE(hdr->nRow); + break; + case (HAS_NONE | HAS_NULL): + case (HAS_NONE | HAS_VALUE): + case (HAS_NULL | HAS_VALUE): + info.bitmapOriginalSize = BIT1_SIZE(hdr->nRow); + break; + } + + code = tColDataDecompress(BR_PTR(br), &info, colData, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; + +_exit: + return code; +} + +int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, + SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + SCompressInfo cinfo; + + // uid + if (hdr->szUid > 0) { + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .compressedSize = hdr->szUid, + .originalSize = sizeof(int64_t) * hdr->nRow, + }; + + code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), &cinfo, blockData->aUid, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += cinfo.compressedSize; + } + + // version + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .compressedSize = hdr->szVer, + .originalSize = sizeof(int64_t) * hdr->nRow, + }; + code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), &cinfo, blockData->aVersion, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += cinfo.compressedSize; + + // ts + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_TIMESTAMP, + .compressedSize = hdr->szKey, + .originalSize = sizeof(TSKEY) * hdr->nRow, + }; + code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), &cinfo, blockData->aTSKEY, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += cinfo.compressedSize; + + // primary keys + for (int i = 0; i < hdr->numOfPKs; i++) { + const SBlockCol *blockCol = &hdr->primaryBlockCols[i]; + + ASSERT(blockCol->flag == HAS_VALUE); + ASSERT(blockCol->cflag & COL_IS_KEY); + + code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + return code; +} \ No newline at end of file