From fb4e17a0cbe1ca40a24be65bb2011d48d0fe36d0 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 5 Mar 2024 17:06:14 +0800 Subject: [PATCH] more code --- source/dnode/vnode/src/tsdb/tsdbSnapshot.c | 91 ++++++++++++---------- 1 file changed, 51 insertions(+), 40 deletions(-) diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 952668bc95..90fc5b6753 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -94,7 +94,7 @@ static int32_t tsdbSnapReadFileSetOpenReader(STsdbSnapReader* reader) { .tsdb = reader->tsdb, .szPage = reader->tsdb->pVnode->config.tsdbPageSize, .file = fobj->f[0], - .bufArr = reader->aBuf, + .buffers = reader->buffers, }; code = tsdbSttFileReaderOpen(fobj->fname, &config, &sttReader); @@ -247,11 +247,14 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) { int32_t code = 0; int32_t lino = 0; - int32_t aBufN[5] = {0}; - code = tCmprBlockData(reader->blockData, NO_COMPRESSION, NULL, NULL, reader->aBuf, aBufN); + code = tBlockDataCompress(reader->blockData, NO_COMPRESSION, reader->buffers, reader->buffers + 4); TSDB_CHECK_CODE(code, lino, _exit); + // TSDB_CHECK_CODE(code, lino, _exit); - int32_t size = aBufN[0] + aBufN[1] + aBufN[2] + aBufN[3]; + int32_t size = 0; + for (int i = 0; i < 4; i++) { + size += reader->buffers[i].size; + } *data = taosMemoryMalloc(sizeof(SSnapDataHdr) + size); if (*data == NULL) { code = TSDB_CODE_OUT_OF_MEMORY; @@ -259,16 +262,13 @@ static int32_t tsdbSnapCmprData(STsdbSnapReader* reader, uint8_t** data) { } SSnapDataHdr* pHdr = (SSnapDataHdr*)*data; + uint8_t* pBuf = pHdr->data; + pHdr->type = reader->type; pHdr->size = size; - - memcpy(pHdr->data, reader->aBuf[3], aBufN[3]); - memcpy(pHdr->data + aBufN[3], reader->aBuf[2], aBufN[2]); - if (aBufN[1]) { - memcpy(pHdr->data + aBufN[3] + aBufN[2], reader->aBuf[1], aBufN[1]); - } - if (aBufN[0]) { - memcpy(pHdr->data + aBufN[3] + aBufN[2] + aBufN[1], reader->aBuf[0], aBufN[0]); + for (int i = 0; i < 4; i++) { + memcpy(pBuf, reader->buffers[i].data, reader->buffers[i].size); + pBuf += reader->buffers[i].size; } _exit: @@ -355,8 +355,8 @@ static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) { int32_t lino = 0; int64_t size = 0; - for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->dataArr); i++) { - size += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i); + for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) { + size += reader->tombBlock->buffers[i].size; } data[0] = taosMemoryMalloc(size + sizeof(SSnapDataHdr)); @@ -370,9 +370,9 @@ static int32_t tsdbSnapCmprTombData(STsdbSnapReader* reader, uint8_t** data) { hdr->size = size; uint8_t* tdata = hdr->data; - for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->dataArr); i++) { - memcpy(tdata, TARRAY2_DATA(reader->tombBlock->dataArr + i), TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i)); - tdata += TARRAY2_DATA_LEN(reader->tombBlock->dataArr + i); + for (int32_t i = 0; i < ARRAY_SIZE(reader->tombBlock->buffers); i++) { + memcpy(tdata, reader->tombBlock->buffers[i].data, reader->tombBlock->buffers[i].size); + tdata += reader->tombBlock->buffers[i].size; } _exit: @@ -475,8 +475,8 @@ int32_t tsdbSnapReaderClose(STsdbSnapReader** reader) { tsdbFSDestroyRefRangedSnapshot(&reader[0]->fsrArr); tDestroyTSchema(reader[0]->skmTb->pTSchema); - for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->aBuf); ++i) { - tFree(reader[0]->aBuf[i]); + for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->buffers); ++i) { + tBufferDestroy(reader[0]->buffers + i); } taosMemoryFree(reader[0]); @@ -542,19 +542,19 @@ _exit: // STsdbSnapWriter ======================================== struct STsdbSnapWriter { - STsdb* tsdb; - int64_t sver; - int64_t ever; - int32_t minutes; - int8_t precision; - int32_t minRow; - int32_t maxRow; - int8_t cmprAlg; - int64_t commitID; - int32_t szPage; - int64_t compactVersion; - int64_t now; - uint8_t* aBuf[5]; + STsdb* tsdb; + int64_t sver; + int64_t ever; + int32_t minutes; + int8_t precision; + int32_t minRow; + int32_t maxRow; + int8_t cmprAlg; + int64_t commitID; + int32_t szPage; + int64_t compactVersion; + int64_t now; + SBuffer buffers[5]; TFileSetArray* fsetArr; TFileOpArray fopArr[1]; @@ -632,7 +632,7 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) { // open data reader SDataFileReaderConfig dataFileReaderConfig = { .tsdb = writer->tsdb, - .bufArr = writer->aBuf, + .buffers = writer->buffers, .szPage = writer->szPage, }; @@ -666,7 +666,7 @@ static int32_t tsdbSnapWriteFileSetOpenReader(STsdbSnapWriter* writer) { SSttFileReaderConfig sttFileReaderConfig = { .tsdb = writer->tsdb, .szPage = writer->szPage, - .bufArr = writer->aBuf, + .buffers = writer->buffers, .file = fobj->f[0], }; @@ -939,7 +939,14 @@ static int32_t tsdbSnapWriteTimeSeriesData(STsdbSnapWriter* writer, SSnapDataHdr SBlockData blockData[1] = {0}; - code = tDecmprBlockData(hdr->data, hdr->size - sizeof(*hdr), blockData, writer->aBuf); + SBuffer buffer = { + .capacity = hdr->size, + .data = hdr->data, + .size = hdr->size, + }; + SBufferReader br = BUFFER_READER_INITIALIZER(0, &buffer); + + code = tBlockDataDecompress(&br, blockData, &writer->buffers[0]); TSDB_CHECK_CODE(code, lino, _exit); int32_t fid = tsdbKeyFid(blockData->aTSKEY[0], writer->minutes, writer->precision); @@ -977,15 +984,19 @@ static int32_t tsdbSnapWriteDecmprTombBlock(SSnapDataHdr* hdr, STombBlock* tombB int32_t code = 0; int32_t lino = 0; + tTombBlockClear(tombBlock); + int64_t size = hdr->size; ASSERT(size % TOMB_RECORD_ELEM_NUM == 0); size = size / TOMB_RECORD_ELEM_NUM; - ASSERT(size % sizeof(int64_t) == 0); + tombBlock->numOfRecords = size / sizeof(int64_t); - int64_t* data = (int64_t*)hdr->data; + // int64_t* data = (int64_t*)hdr->data; + uint8_t* data = hdr->data; for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) { - code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], hdr->data + i * size, size / sizeof(int64_t)); + code = tBufferPut(tombBlock->buffers + i, data, size); TSDB_CHECK_CODE(code, lino, _exit); + data += size; } _exit: @@ -1128,8 +1139,8 @@ int32_t tsdbSnapWriterClose(STsdbSnapWriter** writer, int8_t rollback) { TARRAY2_DESTROY(writer[0]->fopArr, NULL); tsdbFSDestroyCopyRangedSnapshot(&writer[0]->fsetArr); - for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->aBuf); ++i) { - tFree(writer[0]->aBuf[i]); + for (int32_t i = 0; i < ARRAY_SIZE(writer[0]->buffers); ++i) { + tBufferDestroy(writer[0]->buffers + i); } taosMemoryFree(writer[0]);