more code

This commit is contained in:
Hongze Cheng 2024-03-07 14:40:42 +08:00
parent b764d51fae
commit f6a5144972
2 changed files with 100 additions and 85 deletions

View File

@ -29,7 +29,7 @@ struct STsdbSnapReader {
int64_t ever;
int8_t type;
SBuffer buffers[5];
SBuffer buffers[10];
SSkmInfo skmTb[1];
TFileSetRangeArray* fsrArr;
@ -554,7 +554,7 @@ struct STsdbSnapWriter {
int32_t szPage;
int64_t compactVersion;
int64_t now;
SBuffer buffers[5];
SBuffer buffers[10];
TFileSetArray* fsetArr;
TFileOpArray fopArr[1];

View File

@ -29,8 +29,7 @@ struct SSttFileReader {
TSttBlkArray sttBlkArray[1];
TStatisBlkArray statisBlkArray[1];
TTombBlkArray tombBlkArray[1];
uint8_t *bufArr[5]; // TODO: remove here
SBuffer local[5];
SBuffer local[10];
SBuffer *buffers;
};
@ -178,13 +177,16 @@ int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk,
int32_t code = 0;
int32_t lino = 0;
SBuffer *buffer0 = reader->buffers + 0;
SBuffer *assist = reader->buffers + 1;
// load data
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szBlock, &reader->buffers[0], 0);
tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szBlock, buffer0, 0);
TSDB_CHECK_CODE(code, lino, _exit);
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
code = tBlockDataDecompress(&br, bData, reader->buffers + 1);
SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
code = tBlockDataDecompress(&br, bData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
@ -194,26 +196,23 @@ _exit:
return code;
}
extern int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData,
SBuffer *assist);
extern int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
SBlockData *blockData, SBuffer *assist);
int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid) {
int32_t code = 0;
int32_t lino = 0;
int32_t n = 0;
int32_t code = 0;
int32_t lino = 0;
SDiskDataHdr hdr;
SBlockCol primaryKeyBlockCols[TD_MAX_PK_COLS];
SBuffer *buffer0 = reader->buffers + 0;
SBuffer *buffer1 = reader->buffers + 1;
SBuffer *assist = reader->buffers + 2;
// load key part
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szKey, &reader->buffers[0], 0);
tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szKey, buffer0, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decode header
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
code = tGetDiskDataHdr(&br, &hdr);
TSDB_CHECK_CODE(code, lino, _exit);
@ -226,35 +225,41 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
bData->nRow = hdr.nRow;
// key part
code = tBlockDataDecompressKeyPart(&hdr, &br, bData, reader->buffers + 1);
code = tBlockDataDecompressKeyPart(&hdr, &br, bData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(br.offset == reader->buffers[0].size);
ASSERT(br.offset == buffer0->size);
if (ncid == 0) {
bool loadExtra = false;
for (int i = 0; i < ncid; i++) {
if (tBlockDataGetColData(bData, cids[i]) == NULL) {
loadExtra = true;
break;
}
}
if (!loadExtra) {
goto _exit;
}
// load SBlockCol part
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, hdr.szBlkCol, &reader->buffers[0],
0);
tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, hdr.szBlkCol, buffer0, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// load each column
SBlockCol blockCol = {
.cid = 0,
};
br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
br = BUFFER_READER_INITIALIZER(0, buffer0);
for (int32_t i = 0; i < ncid; i++) {
int16_t cid = cids[i];
if (tBlockDataGetColData(bData, cid)) {
// this column has been loaded
if (tBlockDataGetColData(bData, cid)) { // already loaded
continue;
}
while (cid > blockCol.cid) {
if (br.offset >= reader->buffers[0].size) {
if (br.offset >= buffer0->size) {
blockCol.cid = INT16_MAX;
break;
}
@ -264,19 +269,32 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
}
if (cid < blockCol.cid) {
// this column as NONE
continue;
const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
ASSERT(tcol);
SBlockCol none = {
.cid = cid,
.type = tcol->type,
.cflag = tcol->flags,
.flag = HAS_NONE,
.szOrigin = 0,
.szBitmap = 0,
.szOffset = 0,
.szValue = 0,
.offset = 0,
};
code = tBlockDataDecompressColData(&hdr, &none, &br, bData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (cid == blockCol.cid) {
// load from file
tBufferClear(&reader->buffers[1]);
tBufferClear(buffer1);
code =
tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr.szBlkCol + blockCol.offset,
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, &reader->buffers[1], 0);
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decode the buffer
SBufferReader br1 = BUFFER_READER_INITIALIZER(0, &reader->buffers[1]);
code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, reader->buffers + 2);
SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -292,14 +310,17 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk
int32_t code = 0;
int32_t lino = 0;
SBuffer *buffer0 = reader->buffers + 0;
SBuffer *assist = reader->buffers + 1;
// load
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, tombBlk->dp->offset, tombBlk->dp->size, &reader->buffers[0], 0);
tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd, tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decode
int32_t size = 0;
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
tTombBlockClear(tombBlock);
tombBlock->numOfRecords = tombBlk->numRec;
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); ++i) {
@ -309,11 +330,12 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk
.originalSize = tombBlk->numRec * sizeof(int64_t),
.compressedSize = tombBlk->size[i],
};
code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tombBlock->buffers + i, reader->buffers + 1);
code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tombBlock->buffers + i, assist);
TSDB_CHECK_CODE(code, lino, _exit);
br.offset += tombBlk->size[i];
}
ASSERT(br.offset == tombBlk->dp->size);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
@ -324,20 +346,20 @@ _exit:
int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) {
int32_t code = 0;
int32_t lino = 0;
int64_t size = 0;
// load data from file
code = tBufferEnsureCapacity(&reader->buffers[0], statisBlk->dp->size);
TSDB_CHECK_CODE(code, lino, _exit);
SBuffer *buffer0 = reader->buffers + 0;
SBuffer *assist = reader->buffers + 1;
code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->buffers[0].data, statisBlk->dp->size, 0);
// load data
tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd, statisBlk->dp->offset, statisBlk->dp->size, buffer0, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decode data
tStatisBlockClear(statisBlock);
statisBlock->numOfPKs = statisBlk->numOfPKs;
statisBlock->numOfRecords = statisBlk->numRec;
SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
SCompressInfo info = {
.dataType = TSDB_DATA_TYPE_BIGINT,
@ -346,16 +368,14 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
.originalSize = statisBlk->numRec * sizeof(int64_t),
};
code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), &info, &statisBlock->buffers[i],
&reader->buffers[1]);
code = tDecompressDataToBuffer(BR_PTR(&br), &info, &statisBlock->buffers[i], assist);
TSDB_CHECK_CODE(code, lino, _exit);
size += statisBlk->size[i];
br.offset += statisBlk->size[i];
}
if (statisBlk->numOfPKs > 0) {
SValueColumnCompressInfo firstKeyInfos[TD_MAX_PK_COLS];
SValueColumnCompressInfo lastKeyInfos[TD_MAX_PK_COLS];
SBufferReader br = BUFFER_READER_INITIALIZER(size, &reader->buffers[0]);
// decode compress info
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
@ -368,25 +388,21 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
TSDB_CHECK_CODE(code, lino, _exit);
}
size = br.offset;
// decode value columns
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), &firstKeyInfos[i],
&statisBlock->firstKeyPKs[i], &reader->buffers[1]);
code = tValueColumnDecompress(BR_PTR(&br), firstKeyInfos + i, &statisBlock->firstKeyPKs[i], assist);
TSDB_CHECK_CODE(code, lino, _exit);
size += firstKeyInfos[i].dataCompressedSize;
br.offset += (firstKeyInfos[i].dataCompressedSize + firstKeyInfos[i].offsetCompressedSize);
}
for (int32_t i = 0; i < statisBlk->numOfPKs; i++) {
code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), &lastKeyInfos[i],
&statisBlock->lastKeyPKs[i], &reader->buffers[1]);
code = tValueColumnDecompress(BR_PTR(&br), &lastKeyInfos[i], &statisBlock->lastKeyPKs[i], assist);
TSDB_CHECK_CODE(code, lino, _exit);
size += lastKeyInfos[i].dataCompressedSize;
br.offset += (lastKeyInfos[i].dataCompressedSize + lastKeyInfos[i].offsetCompressedSize);
}
}
ASSERT(size == statisBlk->dp->size);
ASSERT(br.offset == buffer0->size);
_exit:
if (code) {
@ -418,7 +434,7 @@ struct SSttFileWriter {
// helper data
SSkmInfo skmTb[1];
SSkmInfo skmRow[1];
SBuffer local[5];
SBuffer local[10];
SBuffer *buffers;
};
@ -488,16 +504,19 @@ _exit:
}
static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
int32_t code = 0;
int32_t lino = 0;
if (writer->staticBlock->numOfRecords == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
SBuffer *buffer0 = writer->buffers + 0;
SBuffer *buffer1 = writer->buffers + 1;
SBuffer *assist = writer->buffers + 2;
STbStatisRecord record;
STbStatisBlock *statisBlock = writer->staticBlock;
SStatisBlk statisBlk = {0};
if (statisBlock->numOfRecords == 0) {
return 0;
}
statisBlk.dp->offset = writer->file->size;
statisBlk.dp->size = 0;
statisBlk.numRec = statisBlock->numOfRecords;
@ -520,12 +539,11 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
.originalSize = statisBlock->buffers[i].size,
};
tBufferClear(&writer->buffers[0]);
code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), &info, &writer->buffers[0],
&writer->buffers[1]);
tBufferClear(buffer0);
code = tCompressDataToBuffer(statisBlock->buffers[i].data, &info, buffer0, assist);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize);
code = tsdbWriteFile(writer->fd, writer->file->size, buffer0->data, info.compressedSize);
TSDB_CHECK_CODE(code, lino, _exit);
statisBlk.size[i] = info.compressedSize;
@ -537,35 +555,32 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) {
if (statisBlk.numOfPKs > 0) {
SValueColumnCompressInfo compressInfo = {.cmprAlg = statisBlk.cmprAlg};
tBufferClear(&writer->buffers[0]);
tBufferClear(&writer->buffers[1]);
tBufferClear(buffer0);
tBufferClear(buffer0);
for (int32_t i = 0; i < statisBlk.numOfPKs; i++) {
code =
tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]);
code = tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, buffer1, assist);
TSDB_CHECK_CODE(code, lino, _exit);
code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]);
code = tValueColumnCompressInfoEncode(&compressInfo, buffer0);
TSDB_CHECK_CODE(code, lino, _exit);
}
for (int32_t i = 0; i < statisBlk.numOfPKs; i++) {
code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]);
code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, buffer1, assist);
TSDB_CHECK_CODE(code, lino, _exit);
code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]);
code = tValueColumnCompressInfoEncode(&compressInfo, buffer0);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[0].data, writer->buffers[0].size);
code = tsdbWriteFile(writer->fd, writer->file->size, buffer0->data, buffer0->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->buffers[0].size;
statisBlk.dp->size += writer->buffers[0].size;
writer->file->size += buffer0->size;
statisBlk.dp->size += buffer0->size;
code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[1].data, writer->buffers[1].size);
code = tsdbWriteFile(writer->fd, writer->file->size, buffer1->data, buffer1->size);
TSDB_CHECK_CODE(code, lino, _exit);
writer->file->size += writer->buffers[1].size;
statisBlk.dp->size += writer->buffers[1].size;
writer->file->size += buffer1->size;
statisBlk.dp->size += buffer1->size;
}
code = TARRAY2_APPEND_PTR(writer->statisBlkArray, &statisBlk);