more code

This commit is contained in:
Hongze Cheng 2024-03-05 16:47:59 +08:00
parent 1440c1b942
commit a728f2b5c9
3 changed files with 95 additions and 262 deletions

View File

@ -329,6 +329,8 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
code = tGetDiskDataHdr(&br, &hdr);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
tBlockDataReset(bData);
bData->suid = hdr.suid;
bData->uid = hdr.uid;
@ -337,7 +339,6 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
// Key part
code = tBlockDataDecompressKeyPart(&hdr, &br, bData, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(br.offset == reader->buffers[0].size);
if (ncid == 0) {
@ -1208,29 +1209,6 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0;
#if 0
STombBlk tombBlk[1] = {{
.dp[0] =
{
.offset = *fileSize,
.size = 0,
},
.minTbid =
{
.suid = TARRAY2_FIRST(tombBlock->suid),
.uid = TARRAY2_FIRST(tombBlock->uid),
},
.maxTbid =
{
.suid = TARRAY2_LAST(tombBlock->suid),
.uid = TARRAY2_LAST(tombBlock->uid),
},
.minVer = TARRAY2_FIRST(tombBlock->version),
.maxVer = TARRAY2_FIRST(tombBlock->version),
.numRec = TOMB_BLOCK_SIZE(tombBlock),
.cmprAlg = cmprAlg,
}};
#endif
STombBlk tombBlk = {
.dp[0] =
{

View File

@ -30,6 +30,7 @@ struct SSttFileReader {
TStatisBlkArray statisBlkArray[1];
TTombBlkArray tombBlkArray[1];
uint8_t *bufArr[5]; // TODO: remove here
SBuffer local[5];
SBuffer *buffers;
};
@ -42,8 +43,9 @@ int32_t tsdbSttFileReaderOpen(const char *fname, const SSttFileReaderConfig *con
if (reader[0] == NULL) return TSDB_CODE_OUT_OF_MEMORY;
reader[0]->config[0] = config[0];
if (reader[0]->config->bufArr == NULL) {
reader[0]->config->bufArr = reader[0]->bufArr;
reader[0]->buffers = config->buffers;
if (reader[0]->buffers == NULL) {
reader[0]->buffers = reader[0]->local;
}
// open file
@ -74,8 +76,8 @@ _exit:
int32_t tsdbSttFileReaderClose(SSttFileReader **reader) {
if (reader[0]) {
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->bufArr); ++i) {
tFree(reader[0]->bufArr[i]);
for (int32_t i = 0; i < ARRAY_SIZE(reader[0]->local); ++i) {
tBufferDestroy(reader[0]->local + i);
}
tsdbCloseFile(&reader[0]->fd);
TARRAY2_DESTROY(reader[0]->tombBlkArray, NULL);
@ -176,13 +178,13 @@ int32_t tsdbSttFileReadBlockData(SSttFileReader *reader, const SSttBlk *sttBlk,
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szBlock);
// load data
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szBlock, &reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szBlock, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tDecmprBlockData(reader->config->bufArr[0], sttBlk->bInfo.szBlock, bData, &reader->config->bufArr[1]);
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
code = tBlockDataDecompress(&br, bData, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
@ -193,6 +195,10 @@ _exit:
}
extern int32_t tBlockColAndColumnCmpr(const void *p1, const void *p2);
extern int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, SBlockData *blockData,
SBuffer *assist);
extern int32_t tBlockDataDecompressColData(const SDiskDataHdr *hdr, const SBlockCol *blockCol, SBufferReader *br,
SBlockData *blockData, SBuffer *assist);
int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *sttBlk, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid) {
@ -203,14 +209,14 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
SBlockCol primaryKeyBlockCols[TD_MAX_PK_COLS];
// load key part
code = tRealloc(&reader->config->bufArr[0], sttBlk->bInfo.szKey);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset, reader->config->bufArr[0], sttBlk->bInfo.szKey, 0);
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset, sttBlk->bInfo.szKey, &reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decode header
n += tGetDiskDataHdr(reader->config->bufArr[0] + n, &hdr);
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
code = tGetDiskDataHdr(&br, &hdr);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
@ -220,217 +226,62 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
bData->uid = (sttBlk->suid == 0) ? sttBlk->minUid : 0;
bData->nRow = hdr.nRow;
// decode primary key column indices
for (int32_t i = 0; i < hdr.numOfPKs; i++) {
n += tGetBlockCol(reader->config->bufArr[0] + n, primaryKeyBlockCols + i);
}
// uid
if (hdr.uid == 0) {
ASSERT(hdr.szUid);
code = tsdbDecmprData(reader->config->bufArr[0] + n, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg,
(uint8_t **)&bData->aUid, sizeof(int64_t) * hdr.nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(hdr.szUid == 0);
}
n += hdr.szUid;
// version
code = tsdbDecmprData(reader->config->bufArr[0] + n, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg,
(uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr.nRow, &reader->config->bufArr[1]);
// key part
code = tBlockDataDecompressKeyPart(&hdr, &br, bData, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
n += hdr.szVer;
ASSERT(br.offset == reader->buffers[0].size);
// ts
code = tsdbDecmprData(reader->config->bufArr[0] + n, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg,
(uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr.nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
n += hdr.szKey;
// decode primary key columns
for (int32_t i = 0; i < hdr.numOfPKs; i++) {
SColData *pColData;
code = tBlockDataAddColData(bData, primaryKeyBlockCols[i].cid, primaryKeyBlockCols[i].type,
primaryKeyBlockCols[i].cflag, &pColData);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[0] + n, &primaryKeyBlockCols[i], hdr.cmprAlg, hdr.nRow, pColData,
&reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
n += (primaryKeyBlockCols[i].szBitmap + primaryKeyBlockCols[i].szOffset + primaryKeyBlockCols[i].szValue);
if (ncid == 0) {
goto _exit;
}
ASSERT(n == sttBlk->bInfo.szKey);
// load SBlockCol part
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, hdr.szBlkCol, &reader->buffers[0],
0);
TSDB_CHECK_CODE(code, lino, _exit);
// regular columns load
bool blockColLoaded = false;
int32_t decodedBufferSize = 0;
SBlockCol blockCol = {.cid = 0};
// load each column
SBlockCol blockCol = {
.cid = 0,
};
br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
for (int32_t i = 0; i < ncid; i++) {
SColData *pColData = tBlockDataGetColData(bData, cids[i]);
if (pColData != NULL) continue;
int16_t cid = cids[i];
// load the column index if not loaded yet
if (!blockColLoaded) {
if (hdr.szBlkCol > 0) {
code = tRealloc(&reader->config->bufArr[0], hdr.szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, reader->config->bufArr[0],
hdr.szBlkCol, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
blockColLoaded = true;
if (tBlockDataGetColData(bData, cid)) {
// this column has been loaded
continue;
}
// search the column index
for (;;) {
if (blockCol.cid >= cids[i]) {
break;
}
if (decodedBufferSize >= hdr.szBlkCol) {
while (cid > blockCol.cid) {
if (br.offset >= reader->buffers[0].size) {
blockCol.cid = INT16_MAX;
break;
}
decodedBufferSize += tGetBlockCol(reader->config->bufArr[0] + decodedBufferSize, &blockCol);
code = tGetBlockCol(&br, &blockCol);
TSDB_CHECK_CODE(code, lino, _exit);
}
STColumn *pTColumn =
taosbsearch(&blockCol, pTSchema->columns, pTSchema->numOfCols, sizeof(STSchema), tBlockColAndColumnCmpr, TD_EQ);
ASSERT(pTColumn != NULL);
code = tBlockDataAddColData(bData, cids[i], pTColumn->type, pTColumn->flags, &pColData);
TSDB_CHECK_CODE(code, lino, _exit);
// fill the column data
if (blockCol.cid > cids[i]) {
// set as all NONE
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { // all NONE
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else if (blockCol.flag == HAS_NULL) { // all NULL
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(blockCol.cid, blockCol.type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
int32_t size1 = blockCol.szBitmap + blockCol.szOffset + blockCol.szValue;
code = tRealloc(&reader->config->bufArr[1], size1);
if (cid < blockCol.cid) {
// this column as NONE
continue;
} else if (cid == blockCol.cid) {
// load from file
tBufferClear(&reader->buffers[1]);
code =
tsdbReadFileToBuffer(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr.szBlkCol + blockCol.offset,
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, &reader->buffers[1], 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr.szBlkCol + blockCol.offset,
reader->config->bufArr[1], size1, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[1], &blockCol, hdr.cmprAlg, hdr.nRow, pColData,
&reader->config->bufArr[2]);
// decode the buffer
SBufferReader br1 = BUFFER_READER_INITIALIZER(0, &reader->buffers[1]);
code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, reader->buffers + 2);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
#if 0
// hdr
SDiskDataHdr hdr[1];
int32_t size = 0;
size += tGetDiskDataHdr(reader->config->bufArr[0] + size, hdr);
bData->nRow = hdr->nRow;
bData->uid = hdr->uid;
// uid
if (hdr->uid == 0) {
ASSERT(hdr->szUid);
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szUid, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aUid, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
ASSERT(hdr->szUid == 0);
}
size += hdr->szUid;
// version
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szVer, TSDB_DATA_TYPE_BIGINT, hdr->cmprAlg,
(uint8_t **)&bData->aVersion, sizeof(int64_t) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szVer;
// ts
code = tsdbDecmprData(reader->config->bufArr[0] + size, hdr->szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr->cmprAlg,
(uint8_t **)&bData->aTSKEY, sizeof(TSKEY) * hdr->nRow, &reader->config->bufArr[1]);
TSDB_CHECK_CODE(code, lino, _exit);
size += hdr->szKey;
ASSERT(size == sttBlk->bInfo.szKey);
// other columns
if (bData->nColData > 0) {
if (hdr->szBlkCol > 0) {
code = tRealloc(&reader->config->bufArr[0], hdr->szBlkCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey, reader->config->bufArr[0],
hdr->szBlkCol, 0);
TSDB_CHECK_CODE(code, lino, _exit);
}
SBlockCol bc[1] = {{.cid = 0}};
SBlockCol *blockCol = bc;
size = 0;
for (int32_t i = 0; i < bData->nColData; i++) {
SColData *colData = tBlockDataGetColDataByIdx(bData, i);
while (blockCol && blockCol->cid < colData->cid) {
if (size < hdr->szBlkCol) {
size += tGetBlockCol(reader->config->bufArr[0] + size, blockCol);
} else {
ASSERT(size == hdr->szBlkCol);
blockCol = NULL;
}
}
if (blockCol == NULL || blockCol->cid > colData->cid) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NONE(colData->cid, colData->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
ASSERT(blockCol->type == colData->type);
ASSERT(blockCol->flag && blockCol->flag != HAS_NONE);
if (blockCol->flag == HAS_NULL) {
for (int32_t iRow = 0; iRow < hdr->nRow; iRow++) {
code = tColDataAppendValue(colData, &COL_VAL_NULL(blockCol->cid, blockCol->type));
TSDB_CHECK_CODE(code, lino, _exit);
}
} else {
int32_t size1 = blockCol->szBitmap + blockCol->szOffset + blockCol->szValue;
code = tRealloc(&reader->config->bufArr[1], size1);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, sttBlk->bInfo.offset + sttBlk->bInfo.szKey + hdr->szBlkCol + blockCol->offset,
reader->config->bufArr[1], size1, 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbDecmprColData(reader->config->bufArr[1], blockCol, hdr->cmprAlg, hdr->nRow, colData,
&reader->config->bufArr[2]);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
}
}
#endif
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
@ -442,26 +293,29 @@ int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *tombBlk
int32_t code = 0;
int32_t lino = 0;
code = tRealloc(&reader->config->bufArr[0], tombBlk->dp->size);
// load
tBufferClear(&reader->buffers[0]);
code = tsdbReadFileToBuffer(reader->fd, tombBlk->dp->offset, tombBlk->dp->size, &reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit);
code = tsdbReadFile(reader->fd, tombBlk->dp->offset, reader->config->bufArr[0], tombBlk->dp->size, 0);
if (code) TSDB_CHECK_CODE(code, lino, _exit);
int64_t size = 0;
// decode
int32_t size = 0;
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
tTombBlockClear(tombBlock);
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); ++i) {
code = tsdbDecmprData(reader->config->bufArr[0] + size, tombBlk->size[i], TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg,
&reader->config->bufArr[1], sizeof(int64_t) * tombBlk->numRec, &reader->config->bufArr[2]);
tombBlock->numOfRecords = tombBlk->numRec;
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); ++i) {
SCompressInfo cinfo = {
.cmprAlg = tombBlk->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = tombBlk->numRec * sizeof(int64_t),
.compressedSize = tombBlk->size[i],
};
code =
tDecompressDataToBuffer(BR_PTR(&br), cinfo.compressedSize, &cinfo, tombBlock->buffers + i, reader->buffers + 1);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_BATCH(&tombBlock->dataArr[i], reader->config->bufArr[1], tombBlk->numRec);
TSDB_CHECK_CODE(code, lino, _exit);
size += tombBlk->size[i];
br.offset += tombBlk->size[i];
}
ASSERT(size == tombBlk->dp->size);
_exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(reader->config->tsdb->pVnode), lino, code);
@ -566,12 +420,12 @@ struct SSttFileWriter {
// helper data
SSkmInfo skmTb[1];
SSkmInfo skmRow[1];
uint8_t *bufArr[5]; // TODO: remove this
SBuffer local[5];
SBuffer *buffers;
};
static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, int8_t cmprAlg, int64_t *fileSize,
TSttBlkArray *sttBlkArray, uint8_t **bufArr, SVersionRange *range) {
TSttBlkArray *sttBlkArray, SBuffer *buffers, SVersionRange *range) {
if (blockData->nRow == 0) return 0;
int32_t code = 0;
@ -596,19 +450,17 @@ static int32_t tsdbFileDoWriteSttBlockData(STsdbFD *fd, SBlockData *blockData, i
tsdbWriterUpdVerRange(range, sttBlk->minVer, sttBlk->maxVer);
int32_t sizeArr[5] = {0};
code = tCmprBlockData(blockData, cmprAlg, NULL, NULL, bufArr, sizeArr);
code = tBlockDataCompress(blockData, cmprAlg, buffers, buffers + 4);
if (code) return code;
sttBlk->bInfo.offset = *fileSize;
sttBlk->bInfo.szKey = sizeArr[2] + sizeArr[3];
sttBlk->bInfo.szBlock = sizeArr[0] + sizeArr[1] + sttBlk->bInfo.szKey;
sttBlk->bInfo.szKey = buffers[0].size + buffers[1].size;
sttBlk->bInfo.szBlock = buffers[2].size + buffers[3].size + sttBlk->bInfo.szKey;
for (int32_t i = 3; i >= 0; i--) {
if (sizeArr[i]) {
code = tsdbWriteFile(fd, *fileSize, bufArr[i], sizeArr[i]);
for (int i = 0; i < 4; i++) {
if (buffers[i].size) {
code = tsdbWriteFile(fd, *fileSize, buffers[i].data, buffers[i].size);
if (code) return code;
*fileSize += sizeArr[i];
*fileSize += buffers[i].size;
}
}
@ -627,7 +479,7 @@ static int32_t tsdbSttFileDoWriteBlockData(SSttFileWriter *writer) {
int32_t lino = 0;
code = tsdbFileDoWriteSttBlockData(writer->fd, writer->blockData, writer->config->cmprAlg, &writer->file->size,
writer->sttBlkArray, writer->config->bufArr, &writer->ctx->range);
writer->sttBlkArray, writer->buffers, &writer->ctx->range);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
@ -736,7 +588,7 @@ static int32_t tsdbSttFileDoWriteTombBlock(SSttFileWriter *writer) {
int32_t lino = 0;
code = tsdbFileWriteTombBlock(writer->fd, writer->tombBlock, writer->config->cmprAlg, &writer->file->size,
writer->tombBlkArray, writer->config->bufArr, &writer->ctx->range);
writer->tombBlkArray, writer->buffers, &writer->ctx->range);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
@ -827,7 +679,10 @@ static int32_t tsdbSttFWriterDoOpen(SSttFileWriter *writer) {
// set
if (!writer->config->skmTb) writer->config->skmTb = writer->skmTb;
if (!writer->config->skmRow) writer->config->skmRow = writer->skmRow;
if (!writer->config->bufArr) writer->config->bufArr = writer->bufArr;
writer->buffers = writer->config->buffers;
if (writer->buffers == NULL) {
writer->buffers = writer->local;
}
writer->file[0] = (STFile){
.type = TSDB_FTYPE_STT,
@ -871,8 +726,8 @@ _exit:
static void tsdbSttFWriterDoClose(SSttFileWriter *writer) {
ASSERT(writer->fd == NULL);
for (int32_t i = 0; i < ARRAY_SIZE(writer->bufArr); ++i) {
tFree(writer->bufArr[i]);
for (int32_t i = 0; i < ARRAY_SIZE(writer->local); ++i) {
tBufferDestroy(writer->local + i);
}
tDestroyTSchema(writer->skmRow->pTSchema);
tDestroyTSchema(writer->skmTb->pTSchema);

View File

@ -54,10 +54,10 @@ int32_t tsdbSttFileReadStatisBlock(SSttFileReader *reader, const SStatisBlk *sta
int32_t tsdbSttFileReadTombBlock(SSttFileReader *reader, const STombBlk *delBlk, STombBlock *dData);
struct SSttFileReaderConfig {
STsdb *tsdb;
int32_t szPage;
STFile file[1];
uint8_t **bufArr;
STsdb *tsdb;
int32_t szPage;
STFile file[1];
SBuffer *buffers;
};
// SSttFileWriter ==========================================