diff --git a/include/util/tbuffer.h b/include/util/tbuffer.h index a964dde720..0516aba81b 100644 --- a/include/util/tbuffer.h +++ b/include/util/tbuffer.h @@ -60,6 +60,7 @@ static int32_t tBufferPutF64(SBuffer *buffer, double value); // SBufferReader #define BUFFER_READER_INITIALIZER(offset, buffer) ((SBufferReader){offset, buffer}) +#define BR_PTR(br) tBufferGetDataAt((br)->buffer, (br)->offset) #define tBufferReaderDestroy(reader) ((void)0) #define tBufferReaderGetOffset(reader) ((reader)->offset) static int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index a5a2b1eaca..be9188480f 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -191,18 +191,16 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB // load data tBufferClear(&reader->buffers[0]); - code = tBufferEnsureCapacity(&reader->buffers[0], brinBlk->dp->size); + code = + tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, &reader->buffers[0], 0); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->buffers[0].data, brinBlk->dp->size, 0); - TSDB_CHECK_CODE(code, lino, _exit); - reader->buffers[0].size = brinBlk->dp->size; // decode brin block SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); tBrinBlockClear(brinBlock); brinBlock->numOfPKs = brinBlk->numOfPKs; brinBlock->numOfRecords = brinBlk->numRec; - for (int32_t i = 0; i < 10; i++) { + for (int32_t i = 0; i < 10; i++) { // int64_t SCompressInfo cinfo = { .cmprAlg = brinBlk->cmprAlg, .dataType = TSDB_DATA_TYPE_BIGINT, @@ -215,7 +213,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB br.offset += brinBlk->size[i]; } - for (int32_t i = 10; i < 15; i++) { + for (int32_t i = 10; i < 15; i++) { // int32_t SCompressInfo cinfo = { .cmprAlg = brinBlk->cmprAlg, .dataType = TSDB_DATA_TYPE_INT, @@ -272,20 +270,21 @@ _exit: return code; } +extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist); + int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) { int32_t code = 0; int32_t lino = 0; // load data tBufferClear(&reader->buffers[0]); - code = tBufferEnsureCapacity(&reader->buffers[0], record->blockSize); + code = + tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, &reader->buffers[0], 0); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->buffers[0].data, record->blockSize, 0); - TSDB_CHECK_CODE(code, lino, _exit); - reader->buffers[0].size = record->blockSize; // decompress - code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]); + SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + code = tBlockDataDecompress(&br, bData, reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -308,238 +307,88 @@ int32_t tBlockColAndColumnCmpr(const void *p1, const void *p2) { } } +extern int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, + SBuffer *assist); +extern int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br, + SBlockData *blockData, SBuffer *assist); + int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, STSchema *pTSchema, int16_t cids[], int32_t ncid) { int32_t code = 0; int32_t lino = 0; - int32_t n = 0; SDiskDataHdr hdr; - SBlockCol primaryKeyBlockCols[TD_MAX_PK_COLS]; - // read key part - code = tRealloc(&reader->config->bufArr[0], record->blockKeySize); + // load key part + tBufferClear(&reader->buffers[0]); + code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, + &reader->buffers[0], 0); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize, - 0); - TSDB_CHECK_CODE(code, lino, _exit); - - // decode header - n += tGetDiskDataHdr(reader->config->bufArr[0] + n, &hdr); + // SDiskDataHdr + SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + br.offset += tGetDiskDataHdr((uint8_t *)BR_PTR(&br), &hdr); tBlockDataReset(bData); bData->suid = hdr.suid; bData->uid = hdr.uid; bData->nRow = hdr.nRow; - // decode key part - for (int32_t i = 0; i < hdr.numOfPKs; i++) { - n += tGetBlockCol(reader->config->bufArr[0] + n, &primaryKeyBlockCols[i]); - } - - // uid - if (hdr.uid == 0) { - ASSERT(0); - } - - // version - code = tsdbDecmprData(reader->config->bufArr[0] + n, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, - (uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr.nRow, &reader->config->bufArr[1]); + // Key part + code = tBlockDataDecompressKeyPart(&hdr, &br, bData, reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); - n += hdr.szVer; - // ts - code = tsdbDecmprData(reader->config->bufArr[0] + n, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, - (uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr.nRow, &reader->config->bufArr[1]); - TSDB_CHECK_CODE(code, lino, _exit); - n += hdr.szKey; + ASSERT(br.offset == reader->buffers[0].size); - // primary key columns - for (int32_t i = 0; i < hdr.numOfPKs; i++) { - SColData *pColData; - - code = tBlockDataAddColData(bData, primaryKeyBlockCols[i].cid, primaryKeyBlockCols[i].type, - primaryKeyBlockCols[i].cflag, &pColData); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDecmprColData(reader->config->bufArr[0] + n, &primaryKeyBlockCols[i], hdr.cmprAlg, hdr.nRow, pColData, - &reader->config->bufArr[1]); - TSDB_CHECK_CODE(code, lino, _exit); - - n += (primaryKeyBlockCols[i].szBitmap + primaryKeyBlockCols[i].szOffset + primaryKeyBlockCols[i].szValue); + if (ncid == 0) { + goto _exit; } - ASSERT(n == record->blockKeySize); + // load SBlockCol part + tBufferClear(&reader->buffers[0]); + code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize, hdr.szBlkCol, + &reader->buffers[0], 0); + TSDB_CHECK_CODE(code, lino, _exit); - // regular columns load - bool blockColLoaded = false; - int32_t decodedBufferSize = 0; - SBlockCol blockCol = {.cid = 0}; + // load each column + SBlockCol blockCol = { + .cid = 0, + }; + br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); for (int32_t i = 0; i < ncid; i++) { - SColData *pColData = tBlockDataGetColData(bData, cids[i]); - if (pColData != NULL) continue; + int16_t cid = cids[i]; - // load the column index if not loaded yet - if (!blockColLoaded) { - if (hdr.szBlkCol > 0) { - code = tRealloc(&reader->config->bufArr[0], hdr.szBlkCol); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize, - reader->config->bufArr[0], hdr.szBlkCol, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - blockColLoaded = true; + if (tBlockDataGetColData(bData, cid)) { + // this column has been loaded + continue; } - // search the column index - for (;;) { - if (blockCol.cid >= cids[i]) { - break; - } - - if (decodedBufferSize >= hdr.szBlkCol) { + while (cid > blockCol.cid) { + if (br.offset >= reader->buffers[0].size) { blockCol.cid = INT16_MAX; break; } - decodedBufferSize += tGetBlockCol(reader->config->bufArr[0] + decodedBufferSize, &blockCol); + br.offset += tGetBlockCol((uint8_t *)BR_PTR(&br), &blockCol); } - STColumn *pTColumn = - taosbsearch(&blockCol, pTSchema->columns, pTSchema->numOfCols, sizeof(STSchema), tBlockColAndColumnCmpr, TD_EQ); - ASSERT(pTColumn != NULL); - - code = tBlockDataAddColData(bData, cids[i], pTColumn->type, pTColumn->flags, &pColData); - TSDB_CHECK_CODE(code, lino, _exit); - - // fill the column data - if (blockCol.cid > cids[i]) { - // set as all NONE - for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { // all NONE - code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else if (blockCol.flag == HAS_NULL) { // all NULL - for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(blockCol.cid, blockCol.type)); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - int32_t size1 = blockCol.szBitmap + blockCol.szOffset + blockCol.szValue; - - code = tRealloc(&reader->config->bufArr[1], size1); + if (cid < blockCol.cid) { + // this column as NONE + continue; + } else if (cid == blockCol.cid) { + // load from file + tBufferClear(&reader->buffers[1]); + code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], + record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset, + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, &reader->buffers[1], 0); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], - record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset, - reader->config->bufArr[1], size1, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDecmprColData(reader->config->bufArr[1], &blockCol, hdr.cmprAlg, hdr.nRow, pColData, - &reader->config->bufArr[2]); + // decode the buffer + SBufferReader br1 = BUFFER_READER_INITIALIZER(0, &reader->buffers[1]); + code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, reader->buffers + 2); TSDB_CHECK_CODE(code, lino, _exit); } } -#if 0 - // other columns - if (bData->nColData > 0) { - if (hdr->szBlkCol > 0) { - code = tRealloc(&reader->config->bufArr[0], hdr->szBlkCol); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize, - reader->config->bufArr[0], hdr->szBlkCol, 0); - TSDB_CHECK_CODE(code, lino, _exit); - } - - int64_t szHint = 0; - if (bData->nColData > 3) { - int64_t offset = 0; - SBlockCol bc = {.cid = 0}; - SBlockCol *blockCol = &bc; - - size = 0; - SColData *colData = tBlockDataGetColDataByIdx(bData, 0); - while (blockCol && blockCol->cid < colData->cid) { - if (size < hdr->szBlkCol) { - size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol); - } else { - ASSERT(size == hdr->szBlkCol); - blockCol = NULL; - } - } - - if (blockCol && blockCol->flag == HAS_VALUE) { - offset = blockCol->offset; - - SColData *colDataEnd = tBlockDataGetColDataByIdx(bData, bData->nColData - 1); - while (blockCol && blockCol->cid < colDataEnd->cid) { - if (size < hdr->szBlkCol) { - size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol); - } else { - ASSERT(size == hdr->szBlkCol); - blockCol = NULL; - } - } - - if (blockCol && blockCol->flag == HAS_VALUE) { - szHint = blockCol->offset + blockCol->szBitmap + blockCol->szOffset + blockCol->szValue - offset; - } - } - } - - SBlockCol bc[1] = {{.cid = 0}}; - SBlockCol *blockCol = bc; - - size = 0; - for (int32_t i = 0; i < bData->nColData; i++) { - SColData *colData = tBlockDataGetColDataByIdx(bData, i); - - while (blockCol && blockCol->cid < colData->cid) { - if (size < hdr->szBlkCol) { - size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol); - } else { - ASSERT(size == hdr->szBlkCol); - blockCol = NULL; - } - } - - if (blockCol == NULL || blockCol->cid > colData->cid) { - for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) { - code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type)); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - ASSERT(blockCol->type == colData->type); - ASSERT(blockCol->flag && blockCol->flag != HAS_NONE); - - if (blockCol->flag == HAS_NULL) { - for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) { - code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type)); - TSDB_CHECK_CODE(code, lino, _exit); - } - } else { - int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; - - code = tRealloc(&reader->config->bufArr[1], size1); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], - record->blockOffset + record->blockKeySize + hdr->szBlkCol + blockCol->offset, - reader->config->bufArr[1], size1, i > 0 ? 0 : szHint); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData, - &reader->config->bufArr[2]); - TSDB_CHECK_CODE(code, lino, _exit); - } - } - } - } -#endif - _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); @@ -554,23 +403,21 @@ int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *rec TARRAY2_CLEAR(columnDataAggArray, NULL); if (record->smaSize > 0) { - code = tRealloc(&reader->config->bufArr[0], record->smaSize); - TSDB_CHECK_CODE(code, lino, _exit); - - code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize, 0); + tBufferClear(&reader->buffers[0]); + code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, &reader->buffers[0], 0); TSDB_CHECK_CODE(code, lino, _exit); // decode sma data - int32_t size = 0; - while (size < record->smaSize) { + SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + while (br.offset < record->smaSize) { SColumnDataAgg sma[1]; - size += tGetColumnDataAgg(reader->config->bufArr[0] + size, sma); + br.offset += tGetColumnDataAgg((uint8_t *)BR_PTR(&br), sma); code = TARRAY2_APPEND_PTR(columnDataAggArray, sma); TSDB_CHECK_CODE(code, lino, _exit); } - ASSERT(size == record->smaSize); + ASSERT(br.offset == record->smaSize); } _exit: @@ -623,26 +470,26 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB int32_t code = 0; int32_t lino = 0; - code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size); - TSDB_CHECK_CODE(code, lino, _exit); - + tBufferClear(&reader->buffers[0]); code = - tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size, 0); + tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, &reader->buffers[0], 0); TSDB_CHECK_CODE(code, lino, _exit); - int32_t size = 0; + int32_t size = 0; + SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); tTombBlockClear(tData); - for (int32_t i = 0; i < ARRAY_SIZE(tData->dataArr); ++i) { - code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, - &reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]); + tData->numOfRecords = tombBlk->numRec; + for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) { + SCompressInfo cinfo = { + .cmprAlg = tombBlk->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .originalSize = tombBlk->numRec * sizeof(int64_t), + .compressedSize = tombBlk->size[i], + }; + code = tDecompressDataToBuffer(BR_PTR(&br), cinfo.compressedSize, &cinfo, tData->buffers + i, reader->buffers + 1); TSDB_CHECK_CODE(code, lino, _exit); - - code = TARRAY2_APPEND_BATCH(&tData->dataArr[i], reader->config->bufArr[1], tombBlk->numRec); - TSDB_CHECK_CODE(code, lino, _exit); - - size += tombBlk->size[i]; + br.offset += tombBlk->size[i]; } - ASSERT(size == tombBlk->dp->size); _exit: if (code) { diff --git a/source/dnode/vnode/src/tsdb/tsdbDef.h b/source/dnode/vnode/src/tsdb/tsdbDef.h index 0f512e1306..0eaf3e68a6 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDef.h +++ b/source/dnode/vnode/src/tsdb/tsdbDef.h @@ -35,6 +35,7 @@ extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbF extern void tsdbCloseFile(STsdbFD **ppFD); extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size); extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint); +extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint); extern int32_t tsdbFsyncFile(STsdbFD *pFD); #ifdef __cplusplus diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index babf8c75fb..1c6750173c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -414,6 +414,18 @@ _exit: return code; } +int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint) { + int32_t code; + + code = tBufferEnsureCapacity(buffer, buffer->size + size); + if (code) return code; + code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint); + if (code) return code; + buffer->size += size; + + return code; +} + int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t code = 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index c9c98070c0..71b70c0cb3 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -1591,18 +1591,167 @@ _exit: return code; } -typedef struct SBlockDataCmprInfo { - // TODO -} SBlockDataCmprInfo; +int32_t tBlockDataCompress(SBlockData *bData, SBuffer *buffer, SBuffer *assist) { + int32_t code = 0; + SDiskDataHdr hdr = {0}; -int32_t tBlockDataCompress(SBlockData *blockData, SBlockDataCmprInfo *info, SBuffer *buffer, SBuffer *assist) { - // TODO - return 0; + // SDiskDataHdr + // br->offset += tGetDiskDataHdr((uint8_t *)tBufferGetDataAt(br->buffer, br->offset), &hdr); + + tBlockDataReset(bData); + +_exit: + return code; } -int32_t tBlockDataDecompress(SBufferReader *reader, const SBlockDataCmprInfo *info, SBlockData *blockData) { - // TODO - return 0; +int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br, + SBlockData *blockData, SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + + SColData *colData; + + code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData); + TSDB_CHECK_CODE(code, lino, _exit); + + ASSERT(blockCol->flag != HAS_NONE); + + SColDataCompressInfo info = { + .cmprAlg = hdr->cmprAlg, + .columnFlag = blockCol->cflag, + .flag = blockCol->flag, + .dataType = blockCol->type, + .columnId = blockCol->cid, + .numOfData = hdr->nRow, + .bitmapOriginalSize = 0, + .bitmapCompressedSize = blockCol->szBitmap, + .offsetOriginalSize = sizeof(int32_t) * hdr->nRow, + .offsetCompressedSize = blockCol->szOffset, + .dataOriginalSize = blockCol->szOrigin, + .dataCompressedSize = blockCol->szValue, + }; + + switch (blockCol->flag) { + case (HAS_NONE | HAS_NULL | HAS_VALUE): + info.bitmapOriginalSize = BIT2_SIZE(hdr->nRow); + break; + case (HAS_NONE | HAS_NULL): + case (HAS_NONE | HAS_VALUE): + case (HAS_NULL | HAS_VALUE): + info.bitmapOriginalSize = BIT1_SIZE(hdr->nRow); + break; + } + + int32_t totalCompressedSize = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue; + code = tColDataDecompress(BR_PTR(br), totalCompressedSize, &info, colData, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += totalCompressedSize; + +_exit: + return code; +} + +int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, + SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + SCompressInfo cinfo; + + // uid + if (hdr->szUid > 0) { + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .compressedSize = hdr->szUid, + .originalSize = sizeof(int64_t) * hdr->nRow, + }; + + code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), cinfo.compressedSize, &cinfo, blockData->aUid, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += cinfo.compressedSize; + } + + // version + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .compressedSize = hdr->szVer, + .originalSize = sizeof(int64_t) * hdr->nRow, + }; + code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), cinfo.compressedSize, &cinfo, blockData->aVersion, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += cinfo.compressedSize; + + // ts + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_TIMESTAMP, + .compressedSize = hdr->szKey, + .originalSize = sizeof(TSKEY) * hdr->nRow, + }; + code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize); + TSDB_CHECK_CODE(code, lino, _exit); + code = tDecompressData(BR_PTR(br), cinfo.compressedSize, &cinfo, blockData->aTSKEY, cinfo.originalSize, assist); + TSDB_CHECK_CODE(code, lino, _exit); + br->offset += cinfo.compressedSize; + + // primary keys + if (hdr->numOfPKs > 0) { + SBlockCol blockCol; + + for (int i = 0; i < hdr->numOfPKs; i++) { + br->offset += tGetBlockCol((uint8_t *)BR_PTR(br), &blockCol); + + ASSERT(blockCol.flag == HAS_VALUE); + ASSERT(blockCol.cflag & COL_IS_KEY); + + code = tBlockDataDecompressColData(hdr, &blockCol, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); + } + } + +_exit: + return code; +} + +int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + SDiskDataHdr hdr = {0}; + SCompressInfo cinfo; + + // SDiskDataHdr + br->offset += tGetDiskDataHdr((uint8_t *)BR_PTR(br), &hdr); + + tBlockDataReset(blockData); + blockData->suid = hdr.suid; + blockData->uid = hdr.uid; + blockData->nRow = hdr.nRow; + + // Key part + code = tBlockDataDecompressKeyPart(&hdr, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); + + // Column part + uint8_t *decodePtr = (uint8_t *)BR_PTR(br); + int32_t totalSize = 0; + br->offset += hdr.szBlkCol; + while (totalSize < hdr.szBlkCol) { + SBlockCol blockCol; + int32_t size = tGetBlockCol(decodePtr, &blockCol); + decodePtr += size; + totalSize += size; + + code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); + } + +_exit: + return code; } // SDiskDataHdr ============================== diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index 6ea8844320..d137436ff1 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -17,39 +17,46 @@ // SDelBlock ---------- int32_t tTombBlockInit(STombBlock *tombBlock) { + tombBlock->numOfRecords = 0; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - TARRAY2_INIT(&tombBlock->dataArr[i]); + tBufferInit(&tombBlock->buffers[i]); } return 0; } int32_t tTombBlockDestroy(STombBlock *tombBlock) { + tombBlock->numOfRecords = 0; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - TARRAY2_DESTROY(&tombBlock->dataArr[i], NULL); + tBufferDestroy(&tombBlock->buffers[i]); } return 0; } int32_t tTombBlockClear(STombBlock *tombBlock) { + tombBlock->numOfRecords = 0; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - TARRAY2_CLEAR(&tombBlock->dataArr[i], NULL); + tBufferClear(&tombBlock->buffers[i]); } return 0; } int32_t tTombBlockPut(STombBlock *tombBlock, const STombRecord *record) { - int32_t code; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - code = TARRAY2_APPEND(&tombBlock->dataArr[i], record->dataArr[i]); + int32_t code = tBufferPutI64(&tombBlock->buffers[i], record->dataArr[i]); if (code) return code; } + tombBlock->numOfRecords++; return 0; } int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record) { - if (idx >= TOMB_BLOCK_SIZE(tombBlock)) return TSDB_CODE_OUT_OF_RANGE; + if (idx < 0 || idx >= tombBlock->numOfRecords) { + return TSDB_CODE_OUT_OF_RANGE; + } + for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - record->dataArr[i] = TARRAY2_GET(&tombBlock->dataArr[i], idx); + SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * idx, &tombBlock->buffers[i]); + tBufferGetI64(&br, &record->dataArr[i]); } return 0; } @@ -225,12 +232,14 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { ASSERT(record->firstKey.key.numOfPKs == record->lastKey.key.numOfPKs); - if (brinBlock->numOfRecords == 0) { + if (brinBlock->numOfRecords == 0) { // the first row brinBlock->numOfPKs = record->firstKey.key.numOfPKs; + } else if (brinBlock->numOfPKs != record->firstKey.key.numOfPKs) { + // if the number of primary keys are not the same, + // return an error code and the caller should handle it + return TSDB_CODE_INVALID_PARA; } - ASSERT(brinBlock->numOfPKs == record->firstKey.key.numOfPKs); - code = tBufferPutI64(&brinBlock->suids, record->suid); if (code) return code; @@ -243,22 +252,12 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { code = tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version); if (code) return code; - for (int32_t i = 0; i < record->firstKey.key.numOfPKs; ++i) { - code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]); - if (code) return code; - } - code = tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts); if (code) return code; code = tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version); if (code) return code; - for (int32_t i = 0; i < record->lastKey.key.numOfPKs; ++i) { - code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]); - if (code) return code; - } - code = tBufferPutI64(&brinBlock->minVers, record->minVer); if (code) return code; @@ -286,6 +285,18 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { code = tBufferPutI32(&brinBlock->counts, record->count); if (code) return code; + if (brinBlock->numOfPKs > 0) { + for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) { + code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]); + if (code) return code; + } + + for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) { + code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]); + if (code) return code; + } + } + brinBlock->numOfRecords++; return 0; diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index 42b62dbdd2..03253ba649 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -35,14 +35,17 @@ typedef union { }; } STombRecord; -typedef union { - TARRAY2(int64_t) dataArr[TOMB_RECORD_ELEM_NUM]; - struct { - TARRAY2(int64_t) suid[1]; - TARRAY2(int64_t) uid[1]; - TARRAY2(int64_t) version[1]; - TARRAY2(int64_t) skey[1]; - TARRAY2(int64_t) ekey[1]; +typedef struct { + int32_t numOfRecords; + union { + SBuffer buffers[TOMB_RECORD_ELEM_NUM]; + struct { + SBuffer suids; + SBuffer uids; + SBuffer versions; + SBuffer skeys; + SBuffer ekeys; + }; }; } STombBlock; @@ -60,7 +63,7 @@ typedef struct { typedef TARRAY2(STombBlk) TTombBlkArray; -#define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) +#define TOMB_BLOCK_SIZE(db) ((db)->numOfRecords) int32_t tTombBlockInit(STombBlock *tombBlock); int32_t tTombBlockDestroy(STombBlock *tombBlock); @@ -137,21 +140,21 @@ typedef struct { union { SBuffer buffers[15]; struct { - SBuffer suids; - SBuffer uids; - SBuffer firstKeyTimestamps; - SBuffer firstKeyVersions; - SBuffer lastKeyTimestamps; - SBuffer lastKeyVersions; - SBuffer minVers; - SBuffer maxVers; - SBuffer blockOffsets; - SBuffer smaOffsets; - SBuffer blockSizes; - SBuffer blockKeySizes; - SBuffer smaSizes; - SBuffer numRows; - SBuffer counts; + SBuffer suids; // int64_t + SBuffer uids; // int64_t + SBuffer firstKeyTimestamps; // int64_t + SBuffer firstKeyVersions; // int64_t + SBuffer lastKeyTimestamps; // int64_t + SBuffer lastKeyVersions; // int64_t + SBuffer minVers; // int64_t + SBuffer maxVers; // int64_t + SBuffer blockOffsets; // int64_t + SBuffer smaOffsets; // int64_t + SBuffer blockSizes; // int32_t + SBuffer blockKeySizes; // int32_t + SBuffer smaSizes; // int32_t + SBuffer numRows; // int32_t + SBuffer counts; // int32_t }; }; SValueColumn firstKeyPKs[TD_MAX_PK_COLS];