more code

This commit is contained in:
Hongze Cheng 2024-03-07 13:14:56 +08:00
parent 4dbdd2e35b
commit 456db5b4c3
3 changed files with 63 additions and 21 deletions

View File

@ -326,6 +326,7 @@ STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version);
pTSchema = NULL; \ pTSchema = NULL; \
} \ } \
} while (0) } while (0)
const STColumn *tTSchemaSearchColumn(const STSchema *pTSchema, int16_t cid);
struct SValueColumn { struct SValueColumn {
int8_t type; int8_t type;

View File

@ -1694,6 +1694,24 @@ STSchema *tBuildTSchema(SSchema *aSchema, int32_t numOfCols, int32_t version) {
return pTSchema; return pTSchema;
} }
static int32_t tTColumnCompare(const void *p1, const void *p2) {
if (((STColumn *)p1)->colId < ((STColumn *)p2)->colId) {
return -1;
} else if (((STColumn *)p1)->colId > ((STColumn *)p2)->colId) {
return 1;
}
return 0;
}
const STColumn *tTSchemaSearchColumn(const STSchema *pTSchema, int16_t cid) {
STColumn tcol = {
.colId = cid,
};
return taosbsearch(&tcol, pTSchema->columns, pTSchema->numOfCols, sizeof(STColumn), tTColumnCompare, TD_EQ);
}
// SColData ======================================== // SColData ========================================
void tColDataDestroy(void *ph) { void tColDataDestroy(void *ph) {
if (ph) { if (ph) {

View File

@ -295,18 +295,21 @@ _exit:
int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData, int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRecord *record, SBlockData *bData,
STSchema *pTSchema, int16_t cids[], int32_t ncid) { STSchema *pTSchema, int16_t cids[], int32_t ncid) {
int32_t code = 0; int32_t code = 0;
int32_t lino = 0; int32_t lino = 0;
SDiskDataHdr hdr; SDiskDataHdr hdr;
SBuffer *buffer0 = reader->buffers + 0;
SBuffer *buffer1 = reader->buffers + 1;
SBuffer *assist = reader->buffers + 2;
// load key part // load key part
tBufferClear(&reader->buffers[0]); tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset, record->blockKeySize, buffer0, 0);
&reader->buffers[0], 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// SDiskDataHdr // SDiskDataHdr
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); SBufferReader br = BUFFER_READER_INITIALIZER(0, buffer0);
code = tGetDiskDataHdr(&br, &hdr); code = tGetDiskDataHdr(&br, &hdr);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -318,35 +321,42 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
bData->nRow = hdr.nRow; bData->nRow = hdr.nRow;
// Key part // Key part
code = tBlockDataDecompressKeyPart(&hdr, &br, bData, reader->buffers + 1); code = tBlockDataDecompressKeyPart(&hdr, &br, bData, assist);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(br.offset == reader->buffers[0].size); ASSERT(br.offset == buffer0->size);
if (ncid == 0) { bool loadExtra = false;
for (int i = 0; i < ncid; i++) {
if (tBlockDataGetColData(bData, cids[i]) == NULL) {
loadExtra = true;
break;
}
}
if (!loadExtra) {
goto _exit; goto _exit;
} }
// load SBlockCol part // load SBlockCol part
tBufferClear(&reader->buffers[0]); tBufferClear(buffer0);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize, hdr.szBlkCol, code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], record->blockOffset + record->blockKeySize, hdr.szBlkCol,
&reader->buffers[0], 0); buffer0, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// load each column // load each column
SBlockCol blockCol = { SBlockCol blockCol = {
.cid = 0, .cid = 0,
}; };
br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); br = BUFFER_READER_INITIALIZER(0, buffer0);
for (int32_t i = 0; i < ncid; i++) { for (int32_t i = 0; i < ncid; i++) {
int16_t cid = cids[i]; int16_t cid = cids[i];
if (tBlockDataGetColData(bData, cid)) { if (tBlockDataGetColData(bData, cid)) { // already loaded
// this column has been loaded
continue; continue;
} }
while (cid > blockCol.cid) { while (cid > blockCol.cid) {
if (br.offset >= reader->buffers[0].size) { if (br.offset >= buffer0->size) {
blockCol.cid = INT16_MAX; blockCol.cid = INT16_MAX;
break; break;
} }
@ -356,19 +366,32 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
} }
if (cid < blockCol.cid) { if (cid < blockCol.cid) {
// this column as NONE const STColumn *tcol = tTSchemaSearchColumn(pTSchema, cid);
continue; ASSERT(tcol);
SBlockCol none = {
.cid = cid,
.type = tcol->type,
.cflag = tcol->flags,
.flag = HAS_NONE,
.szOrigin = 0,
.szBitmap = 0,
.szOffset = 0,
.szValue = 0,
.offset = 0,
};
code = tBlockDataDecompressColData(&hdr, &none, &br, bData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
} else if (cid == blockCol.cid) { } else if (cid == blockCol.cid) {
// load from file // load from file
tBufferClear(&reader->buffers[1]); tBufferClear(buffer1);
code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA], code = tsdbReadFileToBuffer(reader->fd[TSDB_FTYPE_DATA],
record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset, record->blockOffset + record->blockKeySize + hdr.szBlkCol + blockCol.offset,
blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, &reader->buffers[1], 0); blockCol.szBitmap + blockCol.szOffset + blockCol.szValue, buffer1, 0);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
// decode the buffer // decode the buffer
SBufferReader br1 = BUFFER_READER_INITIALIZER(0, &reader->buffers[1]); SBufferReader br1 = BUFFER_READER_INITIALIZER(0, buffer1);
code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, reader->buffers + 2); code = tBlockDataDecompressColData(&hdr, &blockCol, &br1, bData, assist);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
} }