more code

This commit is contained in:
Hongze Cheng 2024-03-05 17:06:14 +08:00
parent a728f2b5c9
commit fb4e17a0cb
1 changed files with 51 additions and 40 deletions

View File

@ -94,7 +94,7 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) {
.tsdb = reader->tsdb, .tsdb = reader->tsdb,
.szPage = reader->tsdb->pVnode->config.tsdbPageSize, .szPage = reader->tsdb->pVnode->config.tsdbPageSize,
.file = fobj->f[0], .file = fobj->f[0],
.bufArr = reader->aBuf, .buffers = reader->buffers,
}; };
code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader); code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader);
@ -247,11 +247,14 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
int32_t aBufN[5] = {0}; code = tBlockDataCompress(reader->blockData, NO_COMPRESSION, reader->buffers, reader->buffers + 4);
code = tCmprBlockData(reader->blockData, NO_COMPRESSION, NULL, NULL, reader->aBuf, aBufN);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// TSDB_CHECK_CODE(code, lino, _exit);
int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; int32_t size = 0;
for (int i = 0; i < 4; i++) {
size += reader->buffers[i].size;
}
*data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size);
if (*data == NULL) { if (*data == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY; code = TSDB_CODE_OUT_OF_MEMORY;
@ -259,16 +262,13 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) {
} }
SSnapDataHdr* pHdr = (SSnapDataHdr*)*data; SSnapDataHdr* pHdr = (SSnapDataHdr*)*data;
uint8_t* pBuf = pHdr->data;
pHdr->type = reader->type; pHdr->type = reader->type;
pHdr->size = size; pHdr->size = size;
for (int i = 0; i < 4; i++) {
memcpy(pHdr->data, reader->aBuf[3], aBufN[3]); memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size);
memcpy(pHdr->data + aBufN[3], reader->aBuf[2], aBufN[2]); pBuf += reader->buffers[i].size;
if (aBufN[1]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2], reader->aBuf[1], aBufN[1]);
}
if (aBufN[0]) {
memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], reader->aBuf[0], aBufN[0]);
} }
_exit: _exit:
@ -355,8 +355,8 @@ static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
int32_t lino = 0; int32_t lino = 0;
int64_t size = 0; int64_t size = 0;
for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->dataArr); i++) { for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
size += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i); size += reader->tombBlock->buffers[i].size;
} }
data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr)); data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr));
@ -370,9 +370,9 @@ static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) {
hdr->size = size; hdr->size = size;
uint8_t* tdata = hdr->data; uint8_t* tdata = hdr->data;
for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->dataArr); i++) { for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) {
memcpy(tdata, TARRAY2_DATA(reader->tombBlock->dataArr + i), TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i)); memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size);
tdata += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i); tdata += reader->tombBlock->buffers[i].size;
} }
_exit: _exit:
@ -475,8 +475,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) {
tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr); tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr);
tDestroyTSchema(reader[0]->skmTb->pTSchema); tDestroyTSchema(reader[0]->skmTb->pTSchema);
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->aBuf); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) {
tFree(reader[0]->aBuf[i]); tBufferDestroy(reader[0]->buffers + i);
} }
taosMemoryFree(reader[0]); taosMemoryFree(reader[0]);
@ -542,19 +542,19 @@ _exit:
// STsdbSnapWriter ======================================== // STsdbSnapWriter ========================================
struct STsdbSnapWriter { struct STsdbSnapWriter {
STsdb* tsdb; STsdb* tsdb;
int64_t sver; int64_t sver;
int64_t ever; int64_t ever;
int32_t minutes; int32_t minutes;
int8_t precision; int8_t precision;
int32_t minRow; int32_t minRow;
int32_t maxRow; int32_t maxRow;
int8_t cmprAlg; int8_t cmprAlg;
int64_t commitID; int64_t commitID;
int32_t szPage; int32_t szPage;
int64_t compactVersion; int64_t compactVersion;
int64_t now; int64_t now;
uint8_t* aBuf[5]; SBuffer buffers[5];
TFileSetArray* fsetArr; TFileSetArray* fsetArr;
TFileOpArray fopArr[1]; TFileOpArray fopArr[1];
@ -632,7 +632,7 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
// open data reader // open data reader
SDataFileReaderConfig dataFileReaderConfig = { SDataFileReaderConfig dataFileReaderConfig = {
.tsdb = writer->tsdb, .tsdb = writer->tsdb,
.bufArr = writer->aBuf, .buffers = writer->buffers,
.szPage = writer->szPage, .szPage = writer->szPage,
}; };
@ -666,7 +666,7 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) {
SSttFileReaderConfig sttFileReaderConfig = { SSttFileReaderConfig sttFileReaderConfig = {
.tsdb = writer->tsdb, .tsdb = writer->tsdb,
.szPage = writer->szPage, .szPage = writer->szPage,
.bufArr = writer->aBuf, .buffers = writer->buffers,
.file = fobj->f[0], .file = fobj->f[0],
}; };
@ -939,7 +939,14 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr
SBlockData blockData[1] = {0}; SBlockData blockData[1] = {0};
code = tDecmprBlockData(hdr->data, hdr->size - sizeof(*hdr), blockData, writer->aBuf); SBuffer buffer = {
.capacity = hdr->size,
.data = hdr->data,
.size = hdr->size,
};
SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer);
code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision); int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision);
@ -977,15 +984,19 @@ static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombB
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
tTombBlockClear(tombBlock);
int64_t size = hdr->size; int64_t size = hdr->size;
ASSERT(size % TOMB_RECORD_ELEM_NUM == 0); ASSERT(size % TOMB_RECORD_ELEM_NUM == 0);
size = size / TOMB_RECORD_ELEM_NUM; size = size / TOMB_RECORD_ELEM_NUM;
ASSERT(size % sizeof(int64_t) == 0); tombBlock->numOfRecords = size / sizeof(int64_t);
int64_t* data = (int64_t*)hdr->data; // int64_t* data = (int64_t*)hdr->data;
uint8_t* data = hdr->data;
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], hdr->data + i * size, size / sizeof(int64_t)); code = tBufferPut(tombBlock->buffers + i, data, size);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
data += size;
} }
_exit: _exit:
@ -1128,8 +1139,8 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) {
TARRAY2_DESTROY(writer[0]->fopArr, NULL); TARRAY2_DESTROY(writer[0]->fopArr, NULL);
tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr); tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr);
for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->aBuf); ++i) { for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) {
tFree(writer[0]->aBuf[i]); tBufferDestroy(writer[0]->buffers + i);
} }
taosMemoryFree(writer[0]); taosMemoryFree(writer[0]);