diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 778fe078f7..57ed48cb99 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -405,17 +405,18 @@ _exit: int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *record, TColumnDataAggArray *columnDataAggArray) { - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + SBuffer *buffer = reader->buffers + 0; TARRAY2_CLEAR(columnDataAggArray, NULL); if (record->smaSize > 0) { - tBufferClear(&reader->buffers[0]); - code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, &reader->buffers[0], 0); + tBufferClear(buffer); + code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_SMA], record->smaOffset, record->smaSize, buffer, 0); TSDB_CHECK_CODE(code, lino, _exit); // decode sma data - SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer); while (br.offset < record->smaSize) { SColumnDataAgg sma[1]; @@ -478,13 +479,15 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB int32_t code = 0; int32_t lino = 0; - tBufferClear(&reader->buffers[0]); - code = - tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, &reader->buffers[0], 0); + SBuffer *buffer0 = reader->buffers + 0; + SBuffer *assist = reader->buffers + 1; + + tBufferClear(buffer0); + code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_TOMB], tombBlk->dp->offset, tombBlk->dp->size, buffer0, 0); TSDB_CHECK_CODE(code, lino, _exit); int32_t size = 0; - SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); + SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0); tTombBlockClear(tData); tData->numOfRecords = tombBlk->numRec; for (int32_t i = 0; i < ARRAY_SIZE(tData->buffers); ++i) { @@ -494,7 +497,7 @@ int32_t tsdbDataFileReadTombBlock(SDataFileReader *reader, const STombBlk *tombB .originalSize = tombBlk->numRec * sizeof(int64_t), .compressedSize = tombBlk->size[i], }; - code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, reader->buffers + 1); + code = tDecompressDataToBuffer(BR_PTR(&br), &cinfo, tData->buffers + i, assist); TSDB_CHECK_CODE(code, lino, _exit); br.offset += tombBlk->size[i]; } @@ -512,7 +515,7 @@ struct SDataFileWriter { SSkmInfo skmTb[1]; SSkmInfo skmRow[1]; - SBuffer local[5]; + SBuffer local[10]; SBuffer *buffers; struct { @@ -710,7 +713,10 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl TBrinBlkArray *brinBlkArray, SBuffer *buffers, SVersionRange *range) { if (brinBlock->numOfRecords == 0) return 0; - int32_t code; + int32_t code; + SBuffer *buffer0 = buffers + 0; + SBuffer *buffer1 = buffers + 1; + SBuffer *assist = buffers + 2; SBrinBlk brinBlk = { .dp[0] = @@ -747,28 +753,34 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer); // write to file - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { - SBuffer *bf = &brinBlock->buffers[i]; + for (int32_t i = 0; i < 10; ++i) { SCompressInfo info = { .cmprAlg = cmprAlg, - .originalSize = bf->size, + .dataType = TSDB_DATA_TYPE_BIGINT, + .originalSize = brinBlock->buffers[i].size, }; - if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { - info.dataType = TSDB_DATA_TYPE_BIGINT; - } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) { - info.dataType = TSDB_DATA_TYPE_INT; - } else { - ASSERT(0); - } - - tBufferClear(&buffers[0]); - code = tCompressDataToBuffer(tBufferGetData(bf), &info, buffers[0].data, &buffers[1]); + tBufferClear(buffer0); + code = tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist); if (code) return code; - - code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size); + code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size); if (code) return code; + brinBlk.size[i] = info.compressedSize; + brinBlk.dp->size += info.compressedSize; + *fileSize += info.compressedSize; + } + for (int32_t i = 10; i < 15; ++i) { + SCompressInfo info = { + .cmprAlg = cmprAlg, + .dataType = TSDB_DATA_TYPE_INT, + .originalSize = brinBlock->buffers[i].size, + }; + tBufferClear(buffer0); + code = tCompressDataToBuffer(brinBlock->buffers[i].data, &info, buffer0, assist); + if (code) return code; + code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size); + if (code) return code; brinBlk.size[i] = info.compressedSize; brinBlk.dp->size += info.compressedSize; *fileSize += info.compressedSize; @@ -776,37 +788,28 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl // write primary keys to file if (brinBlock->numOfPKs > 0) { - for (int i = 0; i < 2; i++) { - tBufferClear(&buffers[i]); - } + tBufferClear(buffer0); + tBufferClear(buffer1); // encode for (int i = 0; i < brinBlock->numOfPKs; i++) { - SValueColumnCompressInfo info = { - .cmprAlg = cmprAlg, - }; - code = tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, &buffers[1], &buffers[2]); - if (code) return code; - code = tValueColumnCompressInfoEncode(&info, &buffers[0]); - if (code) return code; + SValueColumnCompressInfo info = {.cmprAlg = cmprAlg}; + if ((code = tValueColumnCompress(&brinBlock->firstKeyPKs[i], &info, buffer1, assist))) return code; + if ((code = tValueColumnCompressInfoEncode(&info, buffer0))) return code; } for (int i = 0; i < brinBlock->numOfPKs; i++) { - SValueColumnCompressInfo info = { - .cmprAlg = cmprAlg, - }; - code = tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, &buffers[1], &buffers[2]); - if (code) return code; - code = tValueColumnCompressInfoEncode(&info, &buffers[0]); - if (code) return code; + SValueColumnCompressInfo info = {.cmprAlg = cmprAlg}; + if ((code = tValueColumnCompress(&brinBlock->lastKeyPKs[i], &info, buffer1, assist))) return code; + if ((code = tValueColumnCompressInfoEncode(&info, buffer0))) return code; } // write to file - for (int i = 0; i < 2; i++) { - code = tsdbWriteFile(fd, *fileSize, buffers[i].data, buffers[i].size); - if (code) return code; - brinBlk.dp->size += buffers[i].size; - *fileSize += buffers[i].size; - } + if ((code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size))) return code; + *fileSize += buffer0->size; + brinBlk.dp->size += buffer0->size; + if ((code = tsdbWriteFile(fd, *fileSize, buffer1->data, buffer1->size))) return code; + *fileSize += buffer1->size; + brinBlk.dp->size += buffer1->size; } // append to brinBlkArray @@ -870,8 +873,10 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData ASSERT(bData->uid); - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + SBuffer *buffers = writer->buffers; + SBuffer *assist = writer->buffers + 4; SBrinRecord record[1] = {{ .suid = bData->suid, @@ -905,21 +910,21 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer); // to .data file - code = tBlockDataCompress(bData, writer->config->cmprAlg, writer->buffers, &writer->buffers[4]); + code = tBlockDataCompress(bData, writer->config->cmprAlg, buffers, assist); TSDB_CHECK_CODE(code, lino, _exit); - record->blockKeySize = writer->buffers[0].size + writer->buffers[1].size; - record->blockSize = record->blockKeySize + writer->buffers[2].size + writer->buffers[3].size; + record->blockKeySize = buffers[0].size + buffers[1].size; + record->blockSize = record->blockKeySize + buffers[2].size + buffers[3].size; for (int i = 0; i < 4; i++) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->buffers[i].data, - tBufferGetSize(&writer->buffers[i])); + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, buffers[i].data, + buffers[i].size); TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_DATA].size += tBufferGetSize(&writer->buffers[i]); + writer->files[TSDB_FTYPE_DATA].size += buffers[i].size; } // to .sma file - tBufferClear(&writer->buffers[0]); + tBufferClear(&buffers[0]); for (int32_t i = 0; i < bData->nColData; ++i) { SColData *colData = bData->aColData + i; if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue; @@ -927,13 +932,13 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData SColumnDataAgg sma[1] = {{.colId = colData->cid}}; tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); - code = tPutColumnDataAgg(&writer->buffers[0], sma); + code = tPutColumnDataAgg(&buffers[0], sma); TSDB_CHECK_CODE(code, lino, _exit); } - record->smaSize = tBufferGetSize(&writer->buffers[0]); + record->smaSize = buffers[0].size; if (record->smaSize > 0) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->buffers[0].data, record->smaSize); + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, buffers[0].data, record->smaSize); TSDB_CHECK_CODE(code, lino, _exit); writer->files[TSDB_FTYPE_SMA].size += record->smaSize; } @@ -1214,6 +1219,9 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0; + SBuffer *buffer0 = buffers + 0; + SBuffer *assist = buffers + 1; + STombBlk tombBlk = { .dp[0] = { @@ -1248,17 +1256,17 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer); for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) { - tBufferClear(&buffers[0]); + tBufferClear(buffer0); SCompressInfo cinfo = { .cmprAlg = cmprAlg, .dataType = TSDB_DATA_TYPE_BIGINT, .originalSize = tombBlock->buffers[i].size, }; - code = tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, &buffers[0], &buffers[1]); + code = tCompressDataToBuffer(tombBlock->buffers[i].data, &cinfo, buffer0, assist); if (code) return code; - code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size); + code = tsdbWriteFile(fd, *fileSize, buffer0->data, buffer0->size); if (code) return code; tombBlk.size[i] = cinfo.compressedSize;