add alg to block col

This commit is contained in:
yihaoDeng 2024-03-14 12:37:15 +00:00
parent 646478c6dc
commit ec227180e1
4 changed files with 31 additions and 22 deletions

View File

@ -94,7 +94,7 @@ typedef struct STsdbRowKey STsdbRowKey;
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
#define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \
((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE))
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE) * PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
#define PAGE_OFFSET(PGNO, PAGE) (((PGNO)-1) * (PAGE))
#define OFFSET_PGNO(OFFSET, PAGE) ((OFFSET) / (PAGE) + 1)
@ -143,8 +143,8 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
// SBlockCol
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol);
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol);
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver);
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver);
int32_t tBlockColCmprFn(const void *p1, const void *p2);
// SDataBlk
void tDataBlkReset(SDataBlk *pBlock);
@ -441,15 +441,16 @@ struct SMapData {
};
struct SBlockCol {
int16_t cid;
int8_t type;
int8_t cflag;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int32_t szOrigin; // original column value size (only save for variant data type)
int32_t szBitmap; // bitmap size, 0 only for flag == HAS_VAL
int32_t szOffset; // offset size, 0 only for non-variant-length type
int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE)
int32_t offset;
int16_t cid;
int8_t type;
int8_t cflag;
int8_t flag; // HAS_NONE|HAS_NULL|HAS_VALUE
int32_t szOrigin; // original column value size (only save for variant data type)
int32_t szBitmap; // bitmap size, 0 only for flag == HAS_VAL
int32_t szOffset; // offset size, 0 only for non-variant-length type
int32_t szValue; // value size, 0 when flag == (HAS_NULL | HAS_NONE)
int32_t offset;
uint32_t alg;
};
struct SBlockInfo {

View File

@ -361,7 +361,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
break;
}
code = tGetBlockCol(&br, &blockCol);
code = tGetBlockCol(&br, &blockCol, hdr.fmtVer);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -891,7 +891,6 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
.numRow = bData->nRow,
.count = 1,
}};
tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);

View File

@ -264,7 +264,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
break;
}
code = tGetBlockCol(&br, &blockCol);
code = tGetBlockCol(&br, &blockCol, hdr.fmtVer);
TSDB_CHECK_CODE(code, lino, _exit);
}

View File

@ -382,7 +382,10 @@ int32_t tGetSttBlk(uint8_t *p, void *ph) {
}
// SBlockCol ======================================================
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol) {
static const int32_t BLOCK_WITH_ALG_VER = 1;
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver) {
int32_t code;
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
@ -408,11 +411,13 @@ int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol) {
if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code;
}
if (ver >= BLOCK_WITH_ALG_VER) {
if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code;
}
return 0;
}
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol) {
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver) {
int32_t code;
if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code;
@ -444,6 +449,10 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol) {
if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code;
}
if (ver >= BLOCK_WITH_ALG_VER) {
if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code;
}
return 0;
}
@ -1449,7 +1458,7 @@ int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers,
.offset = offset,
};
code = tPutBlockCol(&buffers[2], &blockCol);
code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer);
TSDB_CHECK_CODE(code, lino, _exit);
}
hdr.szBlkCol = buffers[2].size;
@ -1488,7 +1497,7 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *
for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) {
SBlockCol blockCol;
code = tGetBlockCol(&br2, &blockCol);
code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
@ -1515,7 +1524,7 @@ int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) {
if (pHdr->fmtVer == 1) {
if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) {
if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i]))) return code;
if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer))) return code;
}
}
@ -1538,7 +1547,7 @@ int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) {
if (pHdr->fmtVer == 1) {
if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) {
if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i]))) return code;
if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer))) return code;
}
} else {
pHdr->numOfPKs = 0;