more code

This commit is contained in:
Hongze Cheng 2024-03-04 19:25:20 +08:00
parent 7aa2dd1f5f
commit 40a2b06451
7 changed files with 306 additions and 282 deletions

View File

@ -60,6 +60,7 @@ static int32_t tBufferPutF64(SBuffer *buffer, double value);
// SBufferReader // SBufferReader
#define BUFFER_READER_INITIALIZER(offset, buffer) ((SBufferReader){offset, buffer}) #define BUFFER_READER_INITIALIZER(offset, buffer) ((SBufferReader){offset, buffer})
#define BR_PTR(br) tBufferGetDataAt((br)->buffer, (br)->offset)
#define tBufferReaderDestroy(reader) ((void)0) #define tBufferReaderDestroy(reader) ((void)0)
#define tBufferReaderGetOffset(reader) ((reader)->offset) #define tBufferReaderGetOffset(reader) ((reader)->offset)
static int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data); static int32_t tBufferGet(SBufferReader *reader, uint32_t size, void *data);

View File

@ -191,18 +191,16 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
// load data // load data
tBufferClear(&reader->buffers[0]); tBufferClear(&reader->buffers[0]);
code = tBufferEnsureCapacity(&reader->buffers[0], brinBlk->dp->size); code =
tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, brinBlk->dp->size, &reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->buffers[0].data, brinBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
reader->buffers[0].size = brinBlk->dp->size;
// decode brin block // decode brin block
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
tBrinBlockClear(brinBlock); tBrinBlockClear(brinBlock);
brinBlock->numOfPKs = brinBlk->numOfPKs; brinBlock->numOfPKs = brinBlk->numOfPKs;
brinBlock->numOfRecords = brinBlk->numRec; brinBlock->numOfRecords = brinBlk->numRec;
for (int32_t i = 0; i < 10; i++) { for (int32_t i = 0; i < 10; i++) { // int64_t
SCompressInfo cinfo = { SCompressInfo cinfo = {
.cmprAlg = brinBlk->cmprAlg, .cmprAlg = brinBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT, .dataType = TSDB_DATA_TYPE_BIGINT,
@ -215,7 +213,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
br.offset += brinBlk->size[i]; br.offset += brinBlk->size[i];
} }
for (int32_t i = 10; i < 15; i++) { for (int32_t i = 10; i < 15; i++) { // int32_t
SCompressInfo cinfo = { SCompressInfo cinfo = {
.cmprAlg = brinBlk->cmprAlg, .cmprAlg = brinBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_INT, .dataType = TSDB_DATA_TYPE_INT,
@ -272,20 +270,21 @@ _exit:
return code; return code;
} }
extern int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) { int32_t tsdbDataFileReadBlockData(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
// load data // load data
tBufferClear(&reader->buffers[0]); tBufferClear(&reader->buffers[0]);
code = tBufferEnsureCapacity(&reader->buffers[0], record->blockSize); code =
tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockSize, &reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->buffers[0].data, record->blockSize, 0);
TSDB_CHECK_CODE(code, lino, _exit);
reader->buffers[0].size = record->blockSize;
// decompress // decompress
code = tDecmprBlockData(reader->config->bufArr[0], record->blockSize, bData, &reader->config->bufArr[1]); SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
code = tBlockDataDecompress(&br, bData, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
_exit: _exit:
@ -308,238 +307,88 @@ int32_t tBlockColAndColumnCmpr(const void *p1, const void *p2) {
} }
} }
extern int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData,
SBuffer *assist);
extern int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
SBlockData *blockData, SBuffer *assist);
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid) { STSchema *pTSchema, int16_t cids[], int32_t ncid) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t n = 0;
SDiskDataHdr hdr; SDiskDataHdr hdr;
SBlockCol primaryKeyBlockCols[TD_MAX_PK_COLS];
// read key part // load key part
code = tRealloc(&reader->config->bufArr[0], record->blockKeySize); tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize,
&reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, reader->config->bufArr[0], record->blockKeySize, // SDiskDataHdr
0); SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
TSDB_CHECK_CODE(code, lino, _exit); br.offset += tGetDiskDataHdr((uint8_t *)BR_PTR(&br), &hdr);
// decode header
n += tGetDiskDataHdr(reader->config->bufArr[0] + n, &hdr);
tBlockDataReset(bData); tBlockDataReset(bData);
bData->suid = hdr.suid; bData->suid = hdr.suid;
bData->uid = hdr.uid; bData->uid = hdr.uid;
bData->nRow = hdr.nRow; bData->nRow = hdr.nRow;
// decode key part // Key part
for (int32_t i = 0; i < hdr.numOfPKs; i++) { code = tBlockDataDecompressKeyPart(&hdr, &br, bData, reader->buffers + 1);
n += tGetBlockCol(reader->config->bufArr[0] + n, &primaryKeyBlockCols[i]);
}
// uid
if (hdr.uid == 0) {
ASSERT(0);
}
// version
code = tsdbDecmprData(reader->config->bufArr[0] + n, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg,
(uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr.nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
n += hdr.szVer;
// ts ASSERT(br.offset == reader->buffers[0].size);
code = tsdbDecmprData(reader->config->bufArr[0] + n, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg,
(uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr.nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
n += hdr.szKey;
// primary key columns if (ncid == 0) {
for (int32_t i = 0; i < hdr.numOfPKs; i++) { goto _exit;
SColData *pColData;
code = tBlockDataAddColData(bData, primaryKeyBlockCols[i].cid, primaryKeyBlockCols[i].type,
primaryKeyBlockCols[i].cflag, &pColData);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[0] + n, &primaryKeyBlockCols[i], hdr.cmprAlg, hdr.nRow, pColData,
&reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
n += (primaryKeyBlockCols[i].szBitmap + primaryKeyBlockCols[i].szOffset + primaryKeyBlockCols[i].szValue);
} }
ASSERT(n == record->blockKeySize); // load SBlockCol part
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize, hdr.szBlkCol,
&reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit);
// regular columns load // load each column
bool blockColLoaded = false; SBlockCol blockCol = {
int32_t decodedBufferSize = 0; .cid = 0,
SBlockCol blockCol = {.cid = 0}; };
br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
for (int32_t i = 0; i < ncid; i++) { for (int32_t i = 0; i < ncid; i++) {
SColData *pColData = tBlockDataGetColData(bData, cids[i]); int16_t cid = cids[i];
if (pColData != NULL) continue;
// load the column index if not loaded yet if (tBlockDataGetColData(bData, cid)) {
if (!blockColLoaded) { // this column has been loaded
if (hdr.szBlkCol > 0) { continue;
code = tRealloc(&reader->config->bufArr[0], hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
reader->config->bufArr[0], hdr.szBlkCol, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
blockColLoaded = true;
} }
// search the column index while (cid > blockCol.cid) {
for (;;) { if (br.offset >= reader->buffers[0].size) {
if (blockCol.cid >= cids[i]) {
break;
}
if (decodedBufferSize >= hdr.szBlkCol) {
blockCol.cid = INT16_MAX; blockCol.cid = INT16_MAX;
break; break;
} }
decodedBufferSize += tGetBlockCol(reader->config->bufArr[0] + decodedBufferSize, &blockCol); br.offset += tGetBlockCol((uint8_t *)BR_PTR(&br), &blockCol);
} }
STColumn *pTColumn = if (cid < blockCol.cid) {
taosbsearch(&blockCol, pTSchema->columns, pTSchema->numOfCols, sizeof(STSchema), tBlockColAndColumnCmpr, TD_EQ); // this column as NONE
ASSERT(pTColumn != NULL); continue;
} else if (cid == blockCol.cid) {
code = tBlockDataAddColData(bData, cids[i], pTColumn->type, pTColumn->flags, &pColData); // load from file
TSDB_CHECK_CODE(code, lino, _exit); tBufferClear(&reader->buffers[1]);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
// fill the column data record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
if (blockCol.cid > cids[i]) { blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, &reader->buffers[1], 0);
// set as all NONE
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { // all NONE
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else if (blockCol.flag == HAS_NULL) { // all NULL
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(blockCol.cid, blockCol.type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
int32_t size1 = blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
code = tRealloc(&reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], // decode the buffer
record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset, SBufferReader br1 = BUFFER_READER_INITIALIZER(0, &reader->buffers[1]);
reader->config->bufArr[1], size1, 0); code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, reader->buffers + 2);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[1], &blockCol, hdr.cmprAlg, hdr.nRow, pColData,
&reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }
#if 0
// other columns
if (bData->nColData > 0) {
if (hdr->szBlkCol > 0) {
code = tRealloc(&reader->config->bufArr[0], hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize,
reader->config->bufArr[0], hdr->szBlkCol, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
int64_t szHint = 0;
if (bData->nColData > 3) {
int64_t offset = 0;
SBlockCol bc = {.cid = 0};
SBlockCol *blockCol = &bc;
size = 0;
SColData *colData = tBlockDataGetColDataByIdx(bData, 0);
while (blockCol && blockCol->cid < colData->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol && blockCol->flag == HAS_VALUE) {
offset = blockCol->offset;
SColData *colDataEnd = tBlockDataGetColDataByIdx(bData, bData->nColData - 1);
while (blockCol && blockCol->cid < colDataEnd->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol && blockCol->flag == HAS_VALUE) {
szHint = blockCol->offset + blockCol->szBitmap + blockCol->szOffset + blockCol->szValue - offset;
}
}
}
SBlockCol bc[1] = {{.cid = 0}};
SBlockCol *blockCol = bc;
size = 0;
for (int32_t i = 0; i < bData->nColData; i++) {
SColData *colData = tBlockDataGetColDataByIdx(bData, i);
while (blockCol && blockCol->cid < colData->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol == NULL || blockCol->cid > colData->cid) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
ASSERT(blockCol->type == colData->type);
ASSERT(blockCol->flag && blockCol->flag != HAS_NONE);
if (blockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
code = tRealloc(&reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_DATA],
record->blockOffset + record->blockKeySize + hdr->szBlkCol + blockCol->offset,
reader->config->bufArr[1], size1, i > 0 ? 0 : szHint);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
&reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
}
#endif
_exit: _exit:
if (code) { if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
@ -554,23 +403,21 @@ int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *rec
TARRAY2_CLEAR(columnDataAggArray, NULL); TARRAY2_CLEAR(columnDataAggArray, NULL);
if (record->smaSize > 0) { if (record->smaSize > 0) {
code = tRealloc(&reader->config->bufArr[0], record->smaSize); tBufferClear(&reader->buffers[0]);
TSDB_CHECK_CODE(code, lino, _exit); code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, &reader->buffers[0], 0);
code = tsdbReadFile(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, reader->config->bufArr[0], record->smaSize, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// decode sma data // decode sma data
int32_t size = 0; SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
while (size < record->smaSize) { while (br.offset < record->smaSize) {
SColumnDataAgg sma[1]; SColumnDataAgg sma[1];
size += tGetColumnDataAgg(reader->config->bufArr[0] + size, sma); br.offset += tGetColumnDataAgg((uint8_t *)BR_PTR(&br), sma);
code = TARRAY2_APPEND_PTR(columnDataAggArray, sma); code = TARRAY2_APPEND_PTR(columnDataAggArray, sma);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
ASSERT(size == record->smaSize); ASSERT(br.offset == record->smaSize);
} }
_exit: _exit:
@ -623,26 +470,26 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size); tBufferClear(&reader->buffers[0]);
TSDB_CHECK_CODE(code, lino, _exit);
code = code =
tsdbReadFile(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size, 0); tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, &reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = 0; int32_t size = 0;
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
tTombBlockClear(tData); tTombBlockClear(tData);
for (int32_t i = 0; i < ARRAY_SIZE(tData->dataArr); ++i) { tData->numOfRecords = tombBlk->numRec;
code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
&reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]); SCompressInfo cinfo = {
.cmprAlg = tombBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = tombBlk->numRec * sizeof(int64_t),
.compressedSize = tombBlk->size[i],
};
code = tDecompressDataToBuffer(BR_PTR(&br), cinfo.compressedSize, &cinfo, tData->buffers + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
br.offset += tombBlk->size[i];
code = TARRAY2_APPEND_BATCH(&tData->dataArr[i], reader->config->bufArr[1], tombBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += tombBlk->size[i];
} }
ASSERT(size == tombBlk->dp->size);
_exit: _exit:
if (code) { if (code) {

View File

@ -35,6 +35,7 @@ extern int32_t tsdbOpenFile(const char *path, STsdb *pTsdb, int32_t flag, STsdbF
extern void tsdbCloseFile(STsdbFD **ppFD); extern void tsdbCloseFile(STsdbFD **ppFD);
extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size); extern int32_t tsdbWriteFile(STsdbFD *pFD, int64_t offset, const uint8_t *pBuf, int64_t size);
extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint); extern int32_t tsdbReadFile(STsdbFD *pFD, int64_t offset, uint8_t *pBuf, int64_t size, int64_t szHint);
extern int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint);
extern int32_t tsdbFsyncFile(STsdbFD *pFD); extern int32_t tsdbFsyncFile(STsdbFD *pFD);
#ifdef __cplusplus #ifdef __cplusplus

View File

@ -414,6 +414,18 @@ _exit:
return code; return code;
} }
int32_t tsdbReadFileToBuffer(STsdbFD *pFD, int64_t offset, int64_t size, SBuffer *buffer, int64_t szHint) {
int32_t code;
code = tBufferEnsureCapacity(buffer, buffer->size + size);
if (code) return code;
code = tsdbReadFile(pFD, offset, (uint8_t *)tBufferGetDataEnd(buffer), size, szHint);
if (code) return code;
buffer->size += size;
return code;
}
int32_t tsdbFsyncFile(STsdbFD *pFD) { int32_t tsdbFsyncFile(STsdbFD *pFD) {
int32_t code = 0; int32_t code = 0;

View File

@ -1591,18 +1591,167 @@ _exit:
return code; return code;
} }
typedef struct SBlockDataCmprInfo { int32_t tBlockDataCompress(SBlockData *bData, SBuffer *buffer, SBuffer *assist) {
// TODO int32_t code = 0;
} SBlockDataCmprInfo; SDiskDataHdr hdr = {0};
int32_t tBlockDataCompress(SBlockData *blockData, SBlockDataCmprInfo *info, SBuffer *buffer, SBuffer *assist) { // SDiskDataHdr
// TODO // br->offset += tGetDiskDataHdr((uint8_t *)tBufferGetDataAt(br->buffer, br->offset), &hdr);
return 0;
tBlockDataReset(bData);
_exit:
return code;
} }
int32_t tBlockDataDecompress(SBufferReader *reader, const SBlockDataCmprInfo *info, SBlockData *blockData) { int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
// TODO SBlockData *blockData, SBuffer *assist) {
return 0; int32_t code = 0;
int32_t lino = 0;
SColData *colData;
code = tBlockDataAddColData(blockData, blockCol->cid, blockCol->type, blockCol->cflag, &colData);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(blockCol->flag != HAS_NONE);
SColDataCompressInfo info = {
.cmprAlg = hdr->cmprAlg,
.columnFlag = blockCol->cflag,
.flag = blockCol->flag,
.dataType = blockCol->type,
.columnId = blockCol->cid,
.numOfData = hdr->nRow,
.bitmapOriginalSize = 0,
.bitmapCompressedSize = blockCol->szBitmap,
.offsetOriginalSize = sizeof(int32_t) * hdr->nRow,
.offsetCompressedSize = blockCol->szOffset,
.dataOriginalSize = blockCol->szOrigin,
.dataCompressedSize = blockCol->szValue,
};
switch (blockCol->flag) {
case (HAS_NONE | HAS_NULL | HAS_VALUE):
info.bitmapOriginalSize = BIT2_SIZE(hdr->nRow);
break;
case (HAS_NONE | HAS_NULL):
case (HAS_NONE | HAS_VALUE):
case (HAS_NULL | HAS_VALUE):
info.bitmapOriginalSize = BIT1_SIZE(hdr->nRow);
break;
}
int32_t totalCompressedSize = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
code = tColDataDecompress(BR_PTR(br), totalCompressedSize, &info, colData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
br->offset += totalCompressedSize;
_exit:
return code;
}
int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData,
SBuffer *assist) {
int32_t code = 0;
int32_t lino = 0;
SCompressInfo cinfo;
// uid
if (hdr->szUid > 0) {
cinfo = (SCompressInfo){
.cmprAlg = hdr->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.compressedSize = hdr->szUid,
.originalSize = sizeof(int64_t) * hdr->nRow,
};
code = tRealloc((uint8_t **)&blockData->aUid, cinfo.originalSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecompressData(BR_PTR(br), cinfo.compressedSize, &cinfo, blockData->aUid, cinfo.originalSize, assist);
TSDB_CHECK_CODE(code, lino, _exit);
br->offset += cinfo.compressedSize;
}
// version
cinfo = (SCompressInfo){
.cmprAlg = hdr->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.compressedSize = hdr->szVer,
.originalSize = sizeof(int64_t) * hdr->nRow,
};
code = tRealloc((uint8_t **)&blockData->aVersion, cinfo.originalSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecompressData(BR_PTR(br), cinfo.compressedSize, &cinfo, blockData->aVersion, cinfo.originalSize, assist);
TSDB_CHECK_CODE(code, lino, _exit);
br->offset += cinfo.compressedSize;
// ts
cinfo = (SCompressInfo){
.cmprAlg = hdr->cmprAlg,
.dataType = TSDB_DATA_TYPE_TIMESTAMP,
.compressedSize = hdr->szKey,
.originalSize = sizeof(TSKEY) * hdr->nRow,
};
code = tRealloc((uint8_t **)&blockData->aTSKEY, cinfo.originalSize);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecompressData(BR_PTR(br), cinfo.compressedSize, &cinfo, blockData->aTSKEY, cinfo.originalSize, assist);
TSDB_CHECK_CODE(code, lino, _exit);
br->offset += cinfo.compressedSize;
// primary keys
if (hdr->numOfPKs > 0) {
SBlockCol blockCol;
for (int i = 0; i < hdr->numOfPKs; i++) {
br->offset += tGetBlockCol((uint8_t *)BR_PTR(br), &blockCol);
ASSERT(blockCol.flag == HAS_VALUE);
ASSERT(blockCol.cflag & COL_IS_KEY);
code = tBlockDataDecompressColData(hdr, &blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
_exit:
return code;
}
int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist) {
int32_t code = 0;
int32_t lino = 0;
SDiskDataHdr hdr = {0};
SCompressInfo cinfo;
// SDiskDataHdr
br->offset += tGetDiskDataHdr((uint8_t *)BR_PTR(br), &hdr);
tBlockDataReset(blockData);
blockData->suid = hdr.suid;
blockData->uid = hdr.uid;
blockData->nRow = hdr.nRow;
// Key part
code = tBlockDataDecompressKeyPart(&hdr, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
// Column part
uint8_t *decodePtr = (uint8_t *)BR_PTR(br);
int32_t totalSize = 0;
br->offset += hdr.szBlkCol;
while (totalSize < hdr.szBlkCol) {
SBlockCol blockCol;
int32_t size = tGetBlockCol(decodePtr, &blockCol);
decodePtr += size;
totalSize += size;
code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
return code;
} }
// SDiskDataHdr ============================== // SDiskDataHdr ==============================

View File

@ -17,39 +17,46 @@
// SDelBlock ---------- // SDelBlock ----------
int32_t tTombBlockInit(STombBlock *tombBlock) { int32_t tTombBlockInit(STombBlock *tombBlock) {
tombBlock->numOfRecords = 0;
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
TARRAY2_INIT(&tombBlock->dataArr[i]); tBufferInit(&tombBlock->buffers[i]);
} }
return 0; return 0;
} }
int32_t tTombBlockDestroy(STombBlock *tombBlock) { int32_t tTombBlockDestroy(STombBlock *tombBlock) {
tombBlock->numOfRecords = 0;
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
TARRAY2_DESTROY(&tombBlock->dataArr[i], NULL); tBufferDestroy(&tombBlock->buffers[i]);
} }
return 0; return 0;
} }
int32_t tTombBlockClear(STombBlock *tombBlock) { int32_t tTombBlockClear(STombBlock *tombBlock) {
tombBlock->numOfRecords = 0;
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
TARRAY2_CLEAR(&tombBlock->dataArr[i], NULL); tBufferClear(&tombBlock->buffers[i]);
} }
return 0; return 0;
} }
int32_t tTombBlockPut(STombBlock *tombBlock, const STombRecord *record) { int32_t tTombBlockPut(STombBlock *tombBlock, const STombRecord *record) {
int32_t code;
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
code = TARRAY2_APPEND(&tombBlock->dataArr[i], record->dataArr[i]); int32_t code = tBufferPutI64(&tombBlock->buffers[i], record->dataArr[i]);
if (code) return code; if (code) return code;
} }
tombBlock->numOfRecords++;
return 0; return 0;
} }
int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record) { int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record) {
if (idx >= TOMB_BLOCK_SIZE(tombBlock)) return TSDB_CODE_OUT_OF_RANGE; if (idx < 0 || idx >= tombBlock->numOfRecords) {
return TSDB_CODE_OUT_OF_RANGE;
}
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
record->dataArr[i] = TARRAY2_GET(&tombBlock->dataArr[i], idx); SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * idx, &tombBlock->buffers[i]);
tBufferGetI64(&br, &record->dataArr[i]);
} }
return 0; return 0;
} }
@ -225,12 +232,14 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
ASSERT(record->firstKey.key.numOfPKs == record->lastKey.key.numOfPKs); ASSERT(record->firstKey.key.numOfPKs == record->lastKey.key.numOfPKs);
if (brinBlock->numOfRecords == 0) { if (brinBlock->numOfRecords == 0) { // the first row
brinBlock->numOfPKs = record->firstKey.key.numOfPKs; brinBlock->numOfPKs = record->firstKey.key.numOfPKs;
} else if (brinBlock->numOfPKs != record->firstKey.key.numOfPKs) {
// if the number of primary keys are not the same,
// return an error code and the caller should handle it
return TSDB_CODE_INVALID_PARA;
} }
ASSERT(brinBlock->numOfPKs == record->firstKey.key.numOfPKs);
code = tBufferPutI64(&brinBlock->suids, record->suid); code = tBufferPutI64(&brinBlock->suids, record->suid);
if (code) return code; if (code) return code;
@ -243,22 +252,12 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
code = tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version); code = tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version);
if (code) return code; if (code) return code;
for (int32_t i = 0; i < record->firstKey.key.numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]);
if (code) return code;
}
code = tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts); code = tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts);
if (code) return code; if (code) return code;
code = tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version); code = tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version);
if (code) return code; if (code) return code;
for (int32_t i = 0; i < record->lastKey.key.numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]);
if (code) return code;
}
code = tBufferPutI64(&brinBlock->minVers, record->minVer); code = tBufferPutI64(&brinBlock->minVers, record->minVer);
if (code) return code; if (code) return code;
@ -286,6 +285,18 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
code = tBufferPutI32(&brinBlock->counts, record->count); code = tBufferPutI32(&brinBlock->counts, record->count);
if (code) return code; if (code) return code;
if (brinBlock->numOfPKs > 0) {
for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]);
if (code) return code;
}
for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]);
if (code) return code;
}
}
brinBlock->numOfRecords++; brinBlock->numOfRecords++;
return 0; return 0;

View File

@ -35,14 +35,17 @@ typedef union {
}; };
} STombRecord; } STombRecord;
typedef union { typedef struct {
TARRAY2(int64_t) dataArr[TOMB_RECORD_ELEM_NUM]; int32_t numOfRecords;
struct { union {
TARRAY2(int64_t) suid[1]; SBuffer buffers[TOMB_RECORD_ELEM_NUM];
TARRAY2(int64_t) uid[1]; struct {
TARRAY2(int64_t) version[1]; SBuffer suids;
TARRAY2(int64_t) skey[1]; SBuffer uids;
TARRAY2(int64_t) ekey[1]; SBuffer versions;
SBuffer skeys;
SBuffer ekeys;
};
}; };
} STombBlock; } STombBlock;
@ -60,7 +63,7 @@ typedef struct {
typedef TARRAY2(STombBlk) TTombBlkArray; typedef TARRAY2(STombBlk) TTombBlkArray;
#define TOMB_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) #define TOMB_BLOCK_SIZE(db) ((db)->numOfRecords)
int32_t tTombBlockInit(STombBlock *tombBlock); int32_t tTombBlockInit(STombBlock *tombBlock);
int32_t tTombBlockDestroy(STombBlock *tombBlock); int32_t tTombBlockDestroy(STombBlock *tombBlock);
@ -137,21 +140,21 @@ typedef struct {
union { union {
SBuffer buffers[15]; SBuffer buffers[15];
struct { struct {
SBuffer suids; SBuffer suids; // int64_t
SBuffer uids; SBuffer uids; // int64_t
SBuffer firstKeyTimestamps; SBuffer firstKeyTimestamps; // int64_t
SBuffer firstKeyVersions; SBuffer firstKeyVersions; // int64_t
SBuffer lastKeyTimestamps; SBuffer lastKeyTimestamps; // int64_t
SBuffer lastKeyVersions; SBuffer lastKeyVersions; // int64_t
SBuffer minVers; SBuffer minVers; // int64_t
SBuffer maxVers; SBuffer maxVers; // int64_t
SBuffer blockOffsets; SBuffer blockOffsets; // int64_t
SBuffer smaOffsets; SBuffer smaOffsets; // int64_t
SBuffer blockSizes; SBuffer blockSizes; // int32_t
SBuffer blockKeySizes; SBuffer blockKeySizes; // int32_t
SBuffer smaSizes; SBuffer smaSizes; // int32_t
SBuffer numRows; SBuffer numRows; // int32_t
SBuffer counts; SBuffer counts; // int32_t
}; };
}; };
SValueColumn firstKeyPKs[TD_MAX_PK_COLS]; SValueColumn firstKeyPKs[TD_MAX_PK_COLS];