more code

This commit is contained in:
Hongze Cheng 2024-03-07 14:02:13 +08:00
parent 456db5b4c3
commit b764d51fae
1 changed files with 73 additions and 65 deletions

View File

@ -405,17 +405,18 @@ _exit:
int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record, int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record,
TColumnDataAggArray *columnDataAggArray) { TColumnDataAggArray *columnDataAggArray) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SBuffer *buffer = reader->buffers + 0;
TARRAY2_CLEAR(columnDataAggArray, NULL); TARRAY2_CLEAR(columnDataAggArray, NULL);
if (record->smaSize > 0) { if (record->smaSize > 0) {
tBufferClear(&reader->buffers[0]); tBufferClear(buffer);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, &reader->buffers[0], 0); code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// decode sma data // decode sma data
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer);
while (br.offset < record->smaSize) { while (br.offset < record->smaSize) {
SColumnDataAgg sma[1]; SColumnDataAgg sma[1];
@ -478,13 +479,15 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
tBufferClear(&reader->buffers[0]); SBuffer *buffer0 = reader->buffers + 0;
code = SBuffer *assist = reader->buffers + 1;
tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, &reader->buffers[0], 0);
tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = 0; int32_t size = 0;
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
tTombBlockClear(tData); tTombBlockClear(tData);
tData->numOfRecords = tombBlk->numRec; tData->numOfRecords = tombBlk->numRec;
for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) {
@ -494,7 +497,7 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB
.originalSize = tombBlk->numRec * sizeof(int64_t), .originalSize = tombBlk->numRec * sizeof(int64_t),
.compressedSize = tombBlk->size[i], .compressedSize = tombBlk->size[i],
}; };
code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, reader->buffers + 1); code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
br.offset += tombBlk->size[i]; br.offset += tombBlk->size[i];
} }
@ -512,7 +515,7 @@ struct SDataFileWriter {
SSkmInfo skmTb[1]; SSkmInfo skmTb[1];
SSkmInfo skmRow[1]; SSkmInfo skmRow[1];
SBuffer local[5]; SBuffer local[10];
SBuffer *buffers; SBuffer *buffers;
struct { struct {
@ -710,7 +713,10 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range) { TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range) {
if (brinBlock->numOfRecords == 0) return 0; if (brinBlock->numOfRecords == 0) return 0;
int32_t code; int32_t code;
SBuffer *buffer0 = buffers + 0;
SBuffer *buffer1 = buffers + 1;
SBuffer *assist = buffers + 2;
SBrinBlk brinBlk = { SBrinBlk brinBlk = {
.dp[0] = .dp[0] =
@ -747,28 +753,34 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer); tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
// write to file // write to file
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { for (int32_t i = 0; i < 10; ++i) {
SBuffer *bf = &brinBlock->buffers[i];
SCompressInfo info = { SCompressInfo info = {
.cmprAlg = cmprAlg, .cmprAlg = cmprAlg,
.originalSize = bf->size, .dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = brinBlock->buffers[i].size,
}; };
if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { tBufferClear(buffer0);
info.dataType = TSDB_DATA_TYPE_BIGINT; code = tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist);
} else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) {
info.dataType = TSDB_DATA_TYPE_INT;
} else {
ASSERT(0);
}
tBufferClear(&buffers[0]);
code = tCompressDataToBuffer(tBufferGetData(bf), &info, buffers[0].data, &buffers[1]);
if (code) return code; if (code) return code;
code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size);
code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size);
if (code) return code; if (code) return code;
brinBlk.size[i] = info.compressedSize;
brinBlk.dp->size += info.compressedSize;
*fileSize += info.compressedSize;
}
for (int32_t i = 10; i < 15; ++i) {
SCompressInfo info = {
.cmprAlg = cmprAlg,
.dataType = TSDB_DATA_TYPE_INT,
.originalSize = brinBlock->buffers[i].size,
};
tBufferClear(buffer0);
code = tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist);
if (code) return code;
code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size);
if (code) return code;
brinBlk.size[i] = info.compressedSize; brinBlk.size[i] = info.compressedSize;
brinBlk.dp->size += info.compressedSize; brinBlk.dp->size += info.compressedSize;
*fileSize += info.compressedSize; *fileSize += info.compressedSize;
@ -776,37 +788,28 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
// write primary keys to file // write primary keys to file
if (brinBlock->numOfPKs > 0) { if (brinBlock->numOfPKs > 0) {
for (int i = 0; i < 2; i++) { tBufferClear(buffer0);
tBufferClear(&buffers[i]); tBufferClear(buffer1);
}
// encode // encode
for (int i = 0; i < brinBlock->numOfPKs; i++) { for (int i = 0; i < brinBlock->numOfPKs; i++) {
SValueColumnCompressInfo info = { SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
.cmprAlg = cmprAlg, if ((code = tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist))) return code;
}; if ((code = tValueColumnCompressInfoEncode(&info, buffer0))) return code;
code = tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, &buffers[1], &buffers[2]);
if (code) return code;
code = tValueColumnCompressInfoEncode(&info, &buffers[0]);
if (code) return code;
} }
for (int i = 0; i < brinBlock->numOfPKs; i++) { for (int i = 0; i < brinBlock->numOfPKs; i++) {
SValueColumnCompressInfo info = { SValueColumnCompressInfo info = {.cmprAlg = cmprAlg};
.cmprAlg = cmprAlg, if ((code = tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist))) return code;
}; if ((code = tValueColumnCompressInfoEncode(&info, buffer0))) return code;
code = tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, &buffers[1], &buffers[2]);
if (code) return code;
code = tValueColumnCompressInfoEncode(&info, &buffers[0]);
if (code) return code;
} }
// write to file // write to file
for (int i = 0; i < 2; i++) { if ((code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size))) return code;
code = tsdbWriteFile(fd, *fileSize, buffers[i].data, buffers[i].size); *fileSize += buffer0->size;
if (code) return code; brinBlk.dp->size += buffer0->size;
brinBlk.dp->size += buffers[i].size; if ((code = tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size))) return code;
*fileSize += buffers[i].size; *fileSize += buffer1->size;
} brinBlk.dp->size += buffer1->size;
} }
// append to brinBlkArray // append to brinBlkArray
@ -870,8 +873,10 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
ASSERT(bData->uid); ASSERT(bData->uid);
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SBuffer *buffers = writer->buffers;
SBuffer *assist = writer->buffers + 4;
SBrinRecord record[1] = {{ SBrinRecord record[1] = {{
.suid = bData->suid, .suid = bData->suid,
@ -905,21 +910,21 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer); tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
// to .data file // to .data file
code = tBlockDataCompress(bData, writer->config->cmprAlg, writer->buffers, &writer->buffers[4]); code = tBlockDataCompress(bData, writer->config->cmprAlg, buffers, assist);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
record->blockKeySize = writer->buffers[0].size + writer->buffers[1].size; record->blockKeySize = buffers[0].size + buffers[1].size;
record->blockSize = record->blockKeySize + writer->buffers[2].size + writer->buffers[3].size; record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size;
for (int i = 0; i < 4; i++) { for (int i = 0; i < 4; i++) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->buffers[i].data, code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data,
tBufferGetSize(&writer->buffers[i])); buffers[i].size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_DATA].size += tBufferGetSize(&writer->buffers[i]); writer->files[TSDB_FTYPE_DATA].size += buffers[i].size;
} }
// to .sma file // to .sma file
tBufferClear(&writer->buffers[0]); tBufferClear(&buffers[0]);
for (int32_t i = 0; i < bData->nColData; ++i) { for (int32_t i = 0; i < bData->nColData; ++i) {
SColData *colData = bData->aColData + i; SColData *colData = bData->aColData + i;
if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue; if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
@ -927,13 +932,13 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
SColumnDataAgg sma[1] = {{.colId = colData->cid}}; SColumnDataAgg sma[1] = {{.colId = colData->cid}};
tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull);
code = tPutColumnDataAgg(&writer->buffers[0], sma); code = tPutColumnDataAgg(&buffers[0], sma);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
record->smaSize = tBufferGetSize(&writer->buffers[0]); record->smaSize = buffers[0].size;
if (record->smaSize > 0) { if (record->smaSize > 0) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->buffers[0].data, record->smaSize); code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_SMA].size += record->smaSize; writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
} }
@ -1214,6 +1219,9 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0; if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0;
SBuffer *buffer0 = buffers + 0;
SBuffer *assist = buffers + 1;
STombBlk tombBlk = { STombBlk tombBlk = {
.dp[0] = .dp[0] =
{ {
@ -1248,17 +1256,17 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer); tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) { for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
tBufferClear(&buffers[0]); tBufferClear(buffer0);
SCompressInfo cinfo = { SCompressInfo cinfo = {
.cmprAlg = cmprAlg, .cmprAlg = cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT, .dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = tombBlock->buffers[i].size, .originalSize = tombBlock->buffers[i].size,
}; };
code = tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, &buffers[0], &buffers[1]); code = tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist);
if (code) return code; if (code) return code;
code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size); code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size);
if (code) return code; if (code) return code;
tombBlk.size[i] = cinfo.compressedSize; tombBlk.size[i] = cinfo.compressedSize;