diff --git a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c index 90fc5b6753..dece1c7b98 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSnapshot.c +++ b/source/dnode/vnode/src/tsdb/tsdbSnapshot.c @@ -29,7 +29,7 @@ struct STsdbSnapReader { int64_t ever; int8_t type; - SBuffer buffers[5]; + SBuffer buffers[10]; SSkmInfo skmTb[1]; TFileSetRangeArray* fsrArr; @@ -554,7 +554,7 @@ struct STsdbSnapWriter { int32_t szPage; int64_t compactVersion; int64_t now; - SBuffer buffers[5]; + SBuffer buffers[10]; TFileSetArray* fsetArr; TFileOpArray fopArr[1]; diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index 43679afae6..4c30951329 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -29,8 +29,7 @@ struct SSttFileReader { TSttBlkArray sttBlkArray[1]; TStatisBlkArray statisBlkArray[1]; TTombBlkArray tombBlkArray[1]; - uint8_t *bufArr[5]; // TODO: remove here - SBuffer local[5]; + SBuffer local[10]; SBuffer *buffers; }; @@ -178,13 +177,16 @@ int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk, int32_t code = 0; int32_t lino = 0; + SBuffer *buffer0 = reader->buffers + 0; + SBuffer *assist = reader->buffers + 1; + // load data - tBufferClear(&reader->buffers[0]); - code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szBlock, &reader->buffers[0], 0); + tBufferClear(buffer0); + code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szBlock, buffer0, 0); TSDB_CHECK_CODE(code, lino, _exit); - SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); - code = tBlockDataDecompress(&br, bData, reader->buffers + 1); + SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0); + code = tBlockDataDecompress(&br, bData, assist); TSDB_CHECK_CODE(code, lino, _exit); _exit: @@ -194,26 +196,23 @@ _exit: return code; } -extern int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData, - SBuffer *assist); -extern int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br, - SBlockData *blockData, SBuffer *assist); - int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData, STSchema *pTSchema, int16_t cids[], int32_t ncid) { - int32_t code = 0; - int32_t lino = 0; - int32_t n = 0; + int32_t code = 0; + int32_t lino = 0; + SDiskDataHdr hdr; - SBlockCol primaryKeyBlockCols[TD_MAX_PK_COLS]; + SBuffer *buffer0 = reader->buffers + 0; + SBuffer *buffer1 = reader->buffers + 1; + SBuffer *assist = reader->buffers + 2; // load key part - tBufferClear(&reader->buffers[0]); - code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szKey, &reader->buffers[0], 0); + tBufferClear(buffer0); + code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szKey, buffer0, 0); TSDB_CHECK_CODE(code, lino, _exit); // decode header - SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0); code = tGetDiskDataHdr(&br, &hdr); TSDB_CHECK_CODE(code, lino, _exit); @@ -226,35 +225,41 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * bData->nRow = hdr.nRow; // key part - code = tBlockDataDecompressKeyPart(&hdr, &br, bData, reader->buffers + 1); + code = tBlockDataDecompressKeyPart(&hdr, &br, bData, assist); TSDB_CHECK_CODE(code, lino, _exit); - ASSERT(br.offset == reader->buffers[0].size); + ASSERT(br.offset == buffer0->size); - if (ncid == 0) { + bool loadExtra = false; + for (int i = 0; i < ncid; i++) { + if (tBlockDataGetColData(bData, cids[i]) == NULL) { + loadExtra = true; + break; + } + } + + if (!loadExtra) { goto _exit; } // load SBlockCol part - tBufferClear(&reader->buffers[0]); - code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, hdr.szBlkCol, &reader->buffers[0], - 0); + tBufferClear(buffer0); + code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, hdr.szBlkCol, buffer0, 0); TSDB_CHECK_CODE(code, lino, _exit); // load each column SBlockCol blockCol = { .cid = 0, }; - br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + br = BUFFER_READER_INITIALIZER(0, buffer0); for (int32_t i = 0; i < ncid; i++) { int16_t cid = cids[i]; - if (tBlockDataGetColData(bData, cid)) { - // this column has been loaded + if (tBlockDataGetColData(bData, cid)) { // already loaded continue; } while (cid > blockCol.cid) { - if (br.offset >= reader->buffers[0].size) { + if (br.offset >= buffer0->size) { blockCol.cid = INT16_MAX; break; } @@ -264,19 +269,32 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * } if (cid < blockCol.cid) { - // this column as NONE - continue; + const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid); + ASSERT(tcol); + SBlockCol none = { + .cid = cid, + .type = tcol->type, + .cflag = tcol->flags, + .flag = HAS_NONE, + .szOrigin = 0, + .szBitmap = 0, + .szOffset = 0, + .szValue = 0, + .offset = 0, + }; + code = tBlockDataDecompressColData(&hdr, &none, &br, bData, assist); + TSDB_CHECK_CODE(code, lino, _exit); } else if (cid == blockCol.cid) { // load from file - tBufferClear(&reader->buffers[1]); + tBufferClear(buffer1); code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr.szBlkCol + blockCol.offset, - blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, &reader->buffers[1], 0); + blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1, 0); TSDB_CHECK_CODE(code, lino, _exit); // decode the buffer - SBufferReader br1 = BUFFER_READER_INITIALIZER(0, &reader->buffers[1]); - code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, reader->buffers + 2); + SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1); + code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -292,14 +310,17 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk int32_t code = 0; int32_t lino = 0; + SBuffer *buffer0 = reader->buffers + 0; + SBuffer *assist = reader->buffers + 1; + // load - tBufferClear(&reader->buffers[0]); - code = tsdbReadFileToBuffer(reader->fd, tombBlk->dp->offset, tombBlk->dp->size, &reader->buffers[0], 0); + tBufferClear(buffer0); + code = tsdbReadFileToBuffer(reader->fd, tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0); TSDB_CHECK_CODE(code, lino, _exit); // decode int32_t size = 0; - SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0); tTombBlockClear(tombBlock); tombBlock->numOfRecords = tombBlk->numRec; for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); ++i) { @@ -309,11 +330,12 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk .originalSize = tombBlk->numRec * sizeof(int64_t), .compressedSize = tombBlk->size[i], }; - code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tombBlock->buffers + i, reader->buffers + 1); + code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tombBlock->buffers + i, assist); TSDB_CHECK_CODE(code, lino, _exit); br.offset += tombBlk->size[i]; } + ASSERT(br.offset == tombBlk->dp->size); _exit: if (code) { TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code); @@ -324,20 +346,20 @@ _exit: int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *statisBlk, STbStatisBlock *statisBlock) { int32_t code = 0; int32_t lino = 0; - int64_t size = 0; - // load data from file - code = tBufferEnsureCapacity(&reader->buffers[0], statisBlk->dp->size); - TSDB_CHECK_CODE(code, lino, _exit); + SBuffer *buffer0 = reader->buffers + 0; + SBuffer *assist = reader->buffers + 1; - code = tsdbReadFile(reader->fd, statisBlk->dp->offset, reader->buffers[0].data, statisBlk->dp->size, 0); + // load data + tBufferClear(buffer0); + code = tsdbReadFileToBuffer(reader->fd, statisBlk->dp->offset, statisBlk->dp->size, buffer0, 0); TSDB_CHECK_CODE(code, lino, _exit); // decode data tStatisBlockClear(statisBlock); statisBlock->numOfPKs = statisBlk->numOfPKs; statisBlock->numOfRecords = statisBlk->numRec; - + SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0); for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) { SCompressInfo info = { .dataType = TSDB_DATA_TYPE_BIGINT, @@ -346,16 +368,14 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta .originalSize = statisBlk->numRec * sizeof(int64_t), }; - code = tDecompressDataToBuffer(tBufferGetDataAt(&reader->buffers[0], size), &info, &statisBlock->buffers[i], - &reader->buffers[1]); + code = tDecompressDataToBuffer(BR_PTR(&br), &info, &statisBlock->buffers[i], assist); TSDB_CHECK_CODE(code, lino, _exit); - size += statisBlk->size[i]; + br.offset += statisBlk->size[i]; } if (statisBlk->numOfPKs > 0) { SValueColumnCompressInfo firstKeyInfos[TD_MAX_PK_COLS]; SValueColumnCompressInfo lastKeyInfos[TD_MAX_PK_COLS]; - SBufferReader br = BUFFER_READER_INITIALIZER(size, &reader->buffers[0]); // decode compress info for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { @@ -368,25 +388,21 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta TSDB_CHECK_CODE(code, lino, _exit); } - size = br.offset; - // decode value columns for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { - code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), &firstKeyInfos[i], - &statisBlock->firstKeyPKs[i], &reader->buffers[1]); + code = tValueColumnDecompress(BR_PTR(&br), firstKeyInfos + i, &statisBlock->firstKeyPKs[i], assist); TSDB_CHECK_CODE(code, lino, _exit); - size += firstKeyInfos[i].dataCompressedSize; + br.offset += (firstKeyInfos[i].dataCompressedSize + firstKeyInfos[i].offsetCompressedSize); } for (int32_t i = 0; i < statisBlk->numOfPKs; i++) { - code = tValueColumnDecompress(tBufferGetDataAt(&reader->buffers[0], size), &lastKeyInfos[i], - &statisBlock->lastKeyPKs[i], &reader->buffers[1]); + code = tValueColumnDecompress(BR_PTR(&br), &lastKeyInfos[i], &statisBlock->lastKeyPKs[i], assist); TSDB_CHECK_CODE(code, lino, _exit); - size += lastKeyInfos[i].dataCompressedSize; + br.offset += (lastKeyInfos[i].dataCompressedSize + lastKeyInfos[i].offsetCompressedSize); } } - ASSERT(size == statisBlk->dp->size); + ASSERT(br.offset == buffer0->size); _exit: if (code) { @@ -418,7 +434,7 @@ struct SSttFileWriter { // helper data SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; - SBuffer local[5]; + SBuffer local[10]; SBuffer *buffers; }; @@ -488,16 +504,19 @@ _exit: } static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { - int32_t code = 0; - int32_t lino = 0; + if (writer->staticBlock->numOfRecords == 0) return 0; + + int32_t code = 0; + int32_t lino = 0; + + SBuffer *buffer0 = writer->buffers + 0; + SBuffer *buffer1 = writer->buffers + 1; + SBuffer *assist = writer->buffers + 2; + STbStatisRecord record; STbStatisBlock *statisBlock = writer->staticBlock; SStatisBlk statisBlk = {0}; - if (statisBlock->numOfRecords == 0) { - return 0; - } - statisBlk.dp->offset = writer->file->size; statisBlk.dp->size = 0; statisBlk.numRec = statisBlock->numOfRecords; @@ -520,12 +539,11 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { .originalSize = statisBlock->buffers[i].size, }; - tBufferClear(&writer->buffers[0]); - code = tCompressDataToBuffer(tBufferGetData(&statisBlock->buffers[i]), &info, &writer->buffers[0], - &writer->buffers[1]); + tBufferClear(buffer0); + code = tCompressDataToBuffer(statisBlock->buffers[i].data, &info, buffer0, assist); TSDB_CHECK_CODE(code, lino, _exit); - code = tsdbWriteFile(writer->fd, writer->file->size, tBufferGetData(&writer->buffers[0]), info.compressedSize); + code = tsdbWriteFile(writer->fd, writer->file->size, buffer0->data, info.compressedSize); TSDB_CHECK_CODE(code, lino, _exit); statisBlk.size[i] = info.compressedSize; @@ -537,35 +555,32 @@ static int32_t tsdbSttFileDoWriteStatisBlock(SSttFileWriter *writer) { if (statisBlk.numOfPKs > 0) { SValueColumnCompressInfo compressInfo = {.cmprAlg = statisBlk.cmprAlg}; - tBufferClear(&writer->buffers[0]); - tBufferClear(&writer->buffers[1]); + tBufferClear(buffer0); + tBufferClear(buffer0); for (int32_t i = 0; i < statisBlk.numOfPKs; i++) { - code = - tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]); + code = tValueColumnCompress(&statisBlock->firstKeyPKs[i], &compressInfo, buffer1, assist); TSDB_CHECK_CODE(code, lino, _exit); - - code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]); + code = tValueColumnCompressInfoEncode(&compressInfo, buffer0); TSDB_CHECK_CODE(code, lino, _exit); } for (int32_t i = 0; i < statisBlk.numOfPKs; i++) { - code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, &writer->buffers[1], &writer->buffers[2]); + code = tValueColumnCompress(&statisBlock->lastKeyPKs[i], &compressInfo, buffer1, assist); TSDB_CHECK_CODE(code, lino, _exit); - - code = tValueColumnCompressInfoEncode(&compressInfo, &writer->buffers[0]); + code = tValueColumnCompressInfoEncode(&compressInfo, buffer0); TSDB_CHECK_CODE(code, lino, _exit); } - code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[0].data, writer->buffers[0].size); + code = tsdbWriteFile(writer->fd, writer->file->size, buffer0->data, buffer0->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += writer->buffers[0].size; - statisBlk.dp->size += writer->buffers[0].size; + writer->file->size += buffer0->size; + statisBlk.dp->size += buffer0->size; - code = tsdbWriteFile(writer->fd, writer->file->size, writer->buffers[1].data, writer->buffers[1].size); + code = tsdbWriteFile(writer->fd, writer->file->size, buffer1->data, buffer1->size); TSDB_CHECK_CODE(code, lino, _exit); - writer->file->size += writer->buffers[1].size; - statisBlk.dp->size += writer->buffers[1].size; + writer->file->size += buffer1->size; + statisBlk.dp->size += buffer1->size; } code = TARRAY2_APPEND_PTR(writer->statisBlkArray, &statisBlk);