more code

This commit is contained in:
Hongze Cheng 2024-03-05 15:43:46 +08:00
parent b0d9c2632f
commit 8aadc70b06
11 changed files with 363 additions and 513 deletions

View File

@ -142,8 +142,8 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
// SBlockCol
int32_t tPutBlockCol(uint8_t *p, void *ph);
int32_t tGetBlockCol(uint8_t *p, void *ph);
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol);
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol);
int32_t tBlockColCmprFn(const void *p1, const void *p2);
// SDataBlk
void tDataBlkReset(SDataBlk *pBlock);
@ -175,15 +175,14 @@ int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS
int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid);
int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid);
void tBlockDataClear(SBlockData *pBlockData);
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
int32_t aBufN[]);
int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]);
int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist);
int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist);
SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid);
int32_t tBlockDataAddColData(SBlockData *pBlockData, int16_t cid, int8_t type, int8_t cflag, SColData **ppColData);
// SDiskDataHdr
int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr);
int32_t tGetDiskDataHdr(uint8_t *p, void *ph);
int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr);
int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr);
// SDelIdx
int32_t tPutDelIdx(uint8_t *p, void *ph);
int32_t tGetDelIdx(uint8_t *p, void *ph);
@ -209,8 +208,8 @@ int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision);
void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey);
int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec);
int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline);
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg);
int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg);
int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg);
int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,
int32_t *szOut, uint8_t **ppBuf);
int32_t tsdbDecmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t szOut,
@ -269,11 +268,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader);
int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx);
int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk);
int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk);
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg);
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData);
int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData);
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData);
int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData);
// SDelFReader
int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb);
int32_t tsdbDelFReaderClose(SDelFReader **ppReader);
@ -561,7 +555,10 @@ struct SDiskDataHdr {
int32_t szBlkCol;
int32_t nRow;
int8_t cmprAlg;
int8_t numOfPKs;
// fmtVer == 1
int8_t numOfPKs;
SBlockCol primaryBlockCols[TD_MAX_PK_COLS];
};
struct SDelFile {

View File

@ -2175,7 +2175,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea
STombRecord record = {0};
bool finished = false;
for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) {
for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) {
code = tTombBlockGet(&block, k, &record);
if (code != TSDB_CODE_SUCCESS) {
finished = true;

View File

@ -326,7 +326,8 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
// SDiskDataHdr
SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]);
br.offset += tGetDiskDataHdr((uint8_t *)BR_PTR(&br), &hdr);
code = tGetDiskDataHdr(&br, &hdr);
TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataReset(bData);
bData->suid = hdr.suid;
@ -368,7 +369,8 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
break;
}
br.offset += tGetBlockCol((uint8_t *)BR_PTR(&br), &blockCol);
code = tGetBlockCol(&br, &blockCol);
TSDB_CHECK_CODE(code, lino, _exit);
}
if (cid < blockCol.cid) {
@ -412,7 +414,8 @@ int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *rec
while (br.offset < record->smaSize) {
SColumnDataAgg sma[1];
br.offset += tGetColumnDataAgg((uint8_t *)BR_PTR(&br), sma);
code = tGetColumnDataAgg(&br, sma);
TSDB_CHECK_CODE(code, lino, _exit);
code = TARRAY2_APPEND_PTR(columnDataAggArray, sma);
TSDB_CHECK_CODE(code, lino, _exit);
@ -705,32 +708,35 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
int32_t code;
SBrinBlk brinBlk = {
.dp[0] =
{
.offset = *fileSize,
.size = 0,
},
.numRec = brinBlock->numOfRecords,
.numOfPKs = brinBlock->numOfPKs,
.cmprAlg = cmprAlg,
};
for (int i = 0; i < brinBlock->numOfRecords; i++) {
SBrinRecord record;
brinBlk.dp->offset = *fileSize;
brinBlk.dp->size = 0;
// minTbid
code = tBufferGet(&brinBlock->suids, 0, sizeof(int64_t), &brinBlk.minTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, 0, sizeof(int64_t), &brinBlk.minTbid.uid);
if (code) return code;
// maxTbid
code = tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.uid);
if (code) return code;
// minVer and maxVer
const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers);
const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers);
brinBlk.minVer = minVers[0];
brinBlk.maxVer = maxVers[0];
for (int32_t i = 1; i < brinBlock->numOfRecords; i++) {
brinBlk.minVer = TMIN(brinBlk.minVer, minVers[i]);
brinBlk.maxVer = TMAX(brinBlk.maxVer, maxVers[i]);
tBrinBlockGet(brinBlock, i, &record);
if (i == 0) {
brinBlk.minTbid.suid = record.suid;
brinBlk.minTbid.uid = record.uid;
brinBlk.minVer = record.minVer;
brinBlk.maxVer = record.maxVer;
}
if (i == brinBlock->numOfRecords - 1) {
brinBlk.maxTbid.suid = record.suid;
brinBlk.maxTbid.uid = record.uid;
}
if (record.minVer < brinBlk.minVer) {
brinBlk.minVer = record.minVer;
}
if (record.maxVer > brinBlk.maxVer) {
brinBlk.maxVer = record.maxVer;
}
}
tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
@ -764,6 +770,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
// write primary keys to file
if (brinBlock->numOfPKs > 0) {
#if 0
SBufferWriter writer;
SValueColumnCompressInfo vcinfo = {.cmprAlg = cmprAlg};
@ -798,7 +805,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
// if (code) return code;
// *fileSize += tBufferGetSize(NULL);
// brinBlk->dp->size += tBufferGetSize(NULL);
tBufferWriterDestroy(writer);
#endif
}
// append to brinBlkArray
@ -832,8 +839,18 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR
int32_t code = 0;
int32_t lino = 0;
code = tBrinBlockPut(writer->brinBlock, record);
TSDB_CHECK_CODE(code, lino, _exit);
for (;;) {
code = tBrinBlockPut(writer->brinBlock, record);
if (code == TSDB_CODE_INVALID_PARA) {
// different records with different primary keys
code = tsdbDataFileWriteBrinBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
continue;
} else {
TSDB_CHECK_CODE(code, lino, _exit);
}
break;
}
if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) {
code = tsdbDataFileWriteBrinBlock(writer);
@ -873,7 +890,7 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
for (int32_t i = 1; i < bData->nRow; ++i) {
if (bData->aTSKEY[i] != bData->aTSKEY[i - 1]) {
if (tsdbRowCmprFn(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) {
record->count++;
}
if (bData->aVersion[i] < record->minVer) {
@ -887,24 +904,21 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer);
// to .data file
int32_t sizeArr[5] = {0};
code = tCmprBlockData(bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr);
code = tBlockDataCompress(bData, writer->config->cmprAlg, writer->buffers, &writer->buffers[4]);
TSDB_CHECK_CODE(code, lino, _exit);
record->blockKeySize = sizeArr[3] + sizeArr[2];
record->blockSize = sizeArr[0] + sizeArr[1] + record->blockKeySize;
record->blockKeySize = writer->buffers[0].size + writer->buffers[1].size;
record->blockSize = record->blockKeySize + writer->buffers[2].size + writer->buffers[3].size;
for (int32_t i = 3; i >= 0; --i) {
if (sizeArr[i]) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->config->bufArr[i],
sizeArr[i]);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_DATA].size += sizeArr[i];
}
for (int i = 0; i < 4; i++) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->buffers[i].data,
tBufferGetSize(&writer->buffers[i]));
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_DATA].size += tBufferGetSize(&writer->buffers[i]);
}
// to .sma file
tBufferClear(&writer->buffers[0]);
for (int32_t i = 0; i < bData->nColData; ++i) {
SColData *colData = bData->aColData + i;
if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue;
@ -912,17 +926,13 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
SColumnDataAgg sma[1] = {{.colId = colData->cid}};
tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull);
int32_t size = tPutColumnDataAgg(NULL, sma);
code = tRealloc(&writer->config->bufArr[0], record->smaSize + size);
code = tPutColumnDataAgg(&writer->buffers[0], sma);
TSDB_CHECK_CODE(code, lino, _exit);
tPutColumnDataAgg(writer->config->bufArr[0] + record->smaSize, sma);
record->smaSize += size;
}
record->smaSize = tBufferGetSize(&writer->buffers[0]);
if (record->smaSize > 0) {
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->config->bufArr[0], record->smaSize);
code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->buffers[0].data, record->smaSize);
TSDB_CHECK_CODE(code, lino, _exit);
writer->files[TSDB_FTYPE_SMA].size += record->smaSize;
}
@ -973,7 +983,7 @@ _exit:
return code;
}
static int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
if (key1 == NULL) {
return 1;
} else if (key2 == NULL) {
@ -1100,7 +1110,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
int32_t lino = 0;
if (writer->ctx->tbHasOldData) {
code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as largest key */);
code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(writer->ctx->tbHasOldData == false);
@ -1198,11 +1208,12 @@ int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFoote
}
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range) {
TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range) {
int32_t code;
if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0;
#if 0
STombBlk tombBlk[1] = {{
.dp[0] =
{
@ -1224,31 +1235,60 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl
.numRec = TOMB_BLOCK_SIZE(tombBlock),
.cmprAlg = cmprAlg,
}};
#endif
STombBlk tombBlk = {
.dp[0] =
{
.offset = *fileSize,
.size = 0,
},
.numRec = TOMB_BLOCK_SIZE(tombBlock),
.cmprAlg = cmprAlg,
};
for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
STombRecord record;
tTombBlockGet(tombBlock, i, &record);
for (int32_t i = 1; i < TOMB_BLOCK_SIZE(tombBlock); i++) {
if (tombBlk->minVer > TARRAY2_GET(tombBlock->version, i)) {
tombBlk->minVer = TARRAY2_GET(tombBlock->version, i);
if (i == 0) {
tombBlk.minTbid.suid = record.suid;
tombBlk.minTbid.uid = record.uid;
tombBlk.minVer = record.version;
tombBlk.maxVer = record.version;
}
if (tombBlk->maxVer < TARRAY2_GET(tombBlock->version, i)) {
tombBlk->maxVer = TARRAY2_GET(tombBlock->version, i);
if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) {
tombBlk.maxTbid.suid = record.suid;
tombBlk.maxTbid.uid = record.uid;
}
if (record.version < tombBlk.minVer) {
tombBlk.minVer = record.version;
}
if (record.version > tombBlk.maxVer) {
tombBlk.maxVer = record.version;
}
}
tsdbWriterUpdVerRange(range, tombBlk->minVer, tombBlk->maxVer);
tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer);
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]),
TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &bufArr[0], 0, &tombBlk->size[i], &bufArr[1]);
for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) {
tBufferClear(&buffers[0]);
SCompressInfo cinfo = {
.cmprAlg = cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = tombBlock->buffers[i].size,
};
code = tCompressDataToBuffer(tombBlock->buffers[i].data, cinfo.originalSize, &cinfo, &buffers[0], &buffers[1]);
if (code) return code;
code = tsdbWriteFile(fd, *fileSize, bufArr[0], tombBlk->size[i]);
code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size);
if (code) return code;
tombBlk->dp->size += tombBlk->size[i];
*fileSize += tombBlk->size[i];
tombBlk.size[i] = cinfo.compressedSize;
tombBlk.dp->size += tombBlk.size[i];
*fileSize += tombBlk.size[i];
}
code = TARRAY2_APPEND_PTR(tombBlkArray, tombBlk);
code = TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk);
if (code) return code;
tTombBlockClear(tombBlock);
@ -1276,7 +1316,7 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) {
int32_t lino = 0;
code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg,
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr,
&writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers,
&writer->ctx->tombRange);
TSDB_CHECK_CODE(code, lino, _exit);

View File

@ -102,7 +102,7 @@ int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFoote
// tomb
int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record);
int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize,
TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range);
TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range);
int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize);
int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize);

View File

@ -254,9 +254,7 @@ _exit:
static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
while (!iter->noMoreData) {
for (; iter->dataTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->dataTomb->tombBlock); iter->dataTomb->tombBlockIdx++) {
iter->record->suid = TARRAY2_GET(iter->dataTomb->tombBlock->suid, iter->dataTomb->tombBlockIdx);
iter->record->uid = TARRAY2_GET(iter->dataTomb->tombBlock->uid, iter->dataTomb->tombBlockIdx);
iter->record->version = TARRAY2_GET(iter->dataTomb->tombBlock->version, iter->dataTomb->tombBlockIdx);
tTombBlockGet(iter->dataTomb->tombBlock, iter->dataTomb->tombBlockIdx, iter->record);
if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) {
continue;
@ -265,9 +263,6 @@ static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) {
continue;
}
iter->record->skey = TARRAY2_GET(iter->dataTomb->tombBlock->skey, iter->dataTomb->tombBlockIdx);
iter->record->ekey = TARRAY2_GET(iter->dataTomb->tombBlock->ekey, iter->dataTomb->tombBlockIdx);
iter->dataTomb->tombBlockIdx++;
goto _exit;
}
@ -444,9 +439,7 @@ static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; }
static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
while (!iter->noMoreData) {
for (; iter->sttTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->sttTomb->tombBlock); iter->sttTomb->tombBlockIdx++) {
iter->record->suid = TARRAY2_GET(iter->sttTomb->tombBlock->suid, iter->sttTomb->tombBlockIdx);
iter->record->uid = TARRAY2_GET(iter->sttTomb->tombBlock->uid, iter->sttTomb->tombBlockIdx);
iter->record->version = TARRAY2_GET(iter->sttTomb->tombBlock->version, iter->sttTomb->tombBlockIdx);
tTombBlockGet(iter->sttTomb->tombBlock, iter->sttTomb->tombBlockIdx, iter->record);
if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) {
continue;
@ -456,8 +449,6 @@ static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) {
continue;
}
iter->record->skey = TARRAY2_GET(iter->sttTomb->tombBlock->skey, iter->sttTomb->tombBlockIdx);
iter->record->ekey = TARRAY2_GET(iter->sttTomb->tombBlock->ekey, iter->sttTomb->tombBlockIdx);
iter->sttTomb->tombBlockIdx++;
goto _exit;
}

View File

@ -14,7 +14,6 @@
*/
#include "tsdbReadUtil.h"
#include "osDef.h"
#include "tsdb.h"
#include "tsdbDataFileRW.h"
#include "tsdbFS2.h"
@ -529,7 +528,7 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_
pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData));
}
for (int32_t k = 0; k < TARRAY2_SIZE(pBlock->suid); ++k) {
for (int32_t k = 0; k < pBlock->numOfRecords; ++k) {
code = tTombBlockGet(pBlock, k, &record);
if (code != TSDB_CODE_SUCCESS) {
*pRet = BLK_CHECK_QUIT;

View File

@ -632,235 +632,6 @@ _err:
return code;
}
int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) {
int32_t code = 0;
SSmaInfo *pSmaInfo = &pDataBlk->smaInfo;
ASSERT(pSmaInfo->size > 0);
taosArrayClear(aColumnDataAgg);
// alloc
code = tRealloc(&pReader->aBuf[0], pSmaInfo->size);
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size, 0);
if (code) goto _err;
// decode
int32_t n = 0;
while (n < pSmaInfo->size) {
SColumnDataAgg sma;
n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma);
if (taosArrayPush(aColumnDataAgg, &sma) == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
}
ASSERT(n == pSmaInfo->size);
return code;
_err:
tsdbError("vgId:%d, tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData,
int32_t iStt) {
int32_t code = 0;
tBlockDataClear(pBlockData);
STsdbFD *pFD = (iStt < 0) ? pReader->pDataFD : pReader->aSttFD[iStt];
// uid + version + tskey
code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey);
if (code) goto _err;
code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey, 0);
if (code) goto _err;
SDiskDataHdr hdr;
uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr);
ASSERT(hdr.delimiter == TSDB_FILE_DLMT);
ASSERT(pBlockData->suid == hdr.suid);
pBlockData->uid = hdr.uid;
pBlockData->nRow = hdr.nRow;
// uid
if (hdr.uid == 0) {
ASSERT(hdr.szUid);
code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid,
sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
if (code) goto _err;
} else {
ASSERT(!hdr.szUid);
}
p += hdr.szUid;
// version
code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion,
sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]);
if (code) goto _err;
p += hdr.szVer;
// TSKEY
code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY,
sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]);
if (code) goto _err;
p += hdr.szKey;
ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey);
// read and decode columns
if (pBlockData->nColData == 0) goto _exit;
if (hdr.szBlkCol > 0) {
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey;
code = tRealloc(&pReader->aBuf[0], hdr.szBlkCol);
if (code) goto _err;
code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol, 0);
if (code) goto _err;
}
SBlockCol blockCol = {.cid = 0};
SBlockCol *pBlockCol = &blockCol;
int32_t n = 0;
for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) {
SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData);
while (pBlockCol && pBlockCol->cid < pColData->cid) {
if (n < hdr.szBlkCol) {
n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol);
} else {
ASSERT(n == hdr.szBlkCol);
pBlockCol = NULL;
}
}
if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) {
// add a lot of NONE
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type));
if (code) goto _err;
}
} else {
ASSERT(pBlockCol->type == pColData->type);
ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE);
if (pBlockCol->flag == HAS_NULL) {
// add a lot of NULL
for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) {
code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type));
if (code) goto _err;
}
} else {
// decode from binary
int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + pBlockCol->offset;
int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue;
code = tRealloc(&pReader->aBuf[1], size);
if (code) goto _err;
code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size, 0);
if (code) goto _err;
code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]);
if (code) goto _err;
}
}
}
_exit:
return code;
_err:
tsdbError("vgId:%d, tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0;
SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0];
// alloc
code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock);
if (code) goto _err;
// read
code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock, 0);
if (code) goto _err;
// decmpr
code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]);
if (code) goto _err;
return code;
_err:
tsdbError("vgId:%d, tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) {
int32_t code = 0;
code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData, -1);
if (code) goto _err;
ASSERT(pDataBlk->nSubBlock == 1);
return code;
_err:
tsdbError("vgId:%d, tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code));
return code;
}
int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
int32_t code = 0;
int32_t lino = 0;
code = tsdbReadBlockDataImpl(pReader, &pSttBlk->bInfo, pBlockData, iStt);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) {
int32_t code = 0;
int32_t lino = 0;
// alloc
code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock);
TSDB_CHECK_CODE(code, lino, _exit);
// read
code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock, 0);
TSDB_CHECK_CODE(code, lino, _exit);
// decmpr
code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
if (code) {
tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code));
}
return code;
}
// SDelFReader ====================================================
struct SDelFReader {
STsdb *pTsdb;

View File

@ -449,12 +449,12 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
struct {
// context
bool toStt;
int8_t cmprAlg;
int32_t maxRow;
int64_t minKey;
int64_t maxKey;
uint8_t *bufArr[8];
bool toStt;
int8_t cmprAlg;
int32_t maxRow;
int64_t minKey;
int64_t maxKey;
SBuffer buffers[8];
// reader
SArray *aDelData;
// writer
@ -503,7 +503,7 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
}
SVersionRange tombRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr, &tombRange);
ctx->buffers, &tombRange);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -516,7 +516,7 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
}
SVersionRange tombRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray,
ctx->bufArr, &tombRange);
ctx->buffers, &tombRange);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -545,8 +545,8 @@ _exit:
if (code) {
TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code);
}
for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); i++) {
tFree(ctx->bufArr[i]);
for (int32_t i = 0; i < ARRAY_SIZE(ctx->buffers); i++) {
tBufferDestroy(ctx->buffers + i);
}
TARRAY2_DESTROY(ctx->tombBlkArray, NULL);
tTombBlockDestroy(ctx->tombBlock);

View File

@ -380,47 +380,44 @@ int32_t tGetSttBlk(uint8_t *p, void *ph) {
}
// SBlockCol ======================================================
int32_t tPutBlockCol(uint8_t *p, void *ph) {
int32_t n = 0;
SBlockCol *pBlockCol = (SBlockCol *)ph;
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol) {
int32_t code;
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
n += tPutI16v(p ? p + n : p, pBlockCol->cid);
n += tPutI8(p ? p + n : p, pBlockCol->type);
n += tPutI8(p ? p + n : p, pBlockCol->cflag);
n += tPutI8(p ? p + n : p, pBlockCol->flag);
n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin);
if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code;
if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code;
if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code;
if ((code = tBufferPutI8(buffer, pBlockCol->flag))) return code;
if ((code = tBufferPutI32v(buffer, pBlockCol->szOrigin))) return code;
if (pBlockCol->flag != HAS_NULL) {
if (pBlockCol->flag != HAS_VALUE) {
n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap);
if ((code = tBufferPutI32v(buffer, pBlockCol->szBitmap))) return code;
}
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tPutI32v(p ? p + n : p, pBlockCol->szOffset);
if ((code = tBufferPutI32v(buffer, pBlockCol->szOffset))) return code;
}
if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
n += tPutI32v(p ? p + n : p, pBlockCol->szValue);
if ((code = tBufferPutI32v(buffer, pBlockCol->szValue))) return code;
}
n += tPutI32v(p ? p + n : p, pBlockCol->offset);
if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code;
}
_exit:
return n;
return 0;
}
int32_t tGetBlockCol(uint8_t *p, void *ph) {
int32_t n = 0;
SBlockCol *pBlockCol = (SBlockCol *)ph;
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol) {
int32_t code;
n += tGetI16v(p + n, &pBlockCol->cid);
n += tGetI8(p + n, &pBlockCol->type);
n += tGetI8(p + n, &pBlockCol->cflag);
n += tGetI8(p + n, &pBlockCol->flag);
n += tGetI32v(p + n, &pBlockCol->szOrigin);
if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code;
if ((code = tBufferGetI8(br, &pBlockCol->type))) return code;
if ((code = tBufferGetI8(br, &pBlockCol->cflag))) return code;
if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code;
if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code;
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
@ -431,21 +428,21 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) {
if (pBlockCol->flag != HAS_NULL) {
if (pBlockCol->flag != HAS_VALUE) {
n += tGetI32v(p + n, &pBlockCol->szBitmap);
if ((code = tBufferGetI32v(br, &pBlockCol->szBitmap))) return code;
}
if (IS_VAR_DATA_TYPE(pBlockCol->type)) {
n += tGetI32v(p + n, &pBlockCol->szOffset);
if ((code = tBufferGetI32v(br, &pBlockCol->szOffset))) return code;
}
if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) {
n += tGetI32v(p + n, &pBlockCol->szValue);
if ((code = tBufferGetI32v(br, &pBlockCol->szValue))) return code;
}
n += tGetI32v(p + n, &pBlockCol->offset);
if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code;
}
return n;
return 0;
}
#ifdef BUILD_NO_CALL
@ -1381,6 +1378,7 @@ SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) {
return NULL;
}
#if 0
int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[],
int32_t aBufSize[]) {
int32_t code = 0;
@ -1590,15 +1588,148 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin
_exit:
return code;
}
#endif
int32_t tBlockDataCompress(SBlockData *bData, SBuffer *buffer, SBuffer *assist) {
int32_t code = 0;
SDiskDataHdr hdr = {0};
static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist) {
int32_t code = 0;
int32_t lino = 0;
SCompressInfo cinfo;
// SDiskDataHdr
// br->offset += tGetDiskDataHdr((uint8_t *)tBufferGetDataAt(br->buffer, br->offset), &hdr);
// uid
if (bData->uid == 0) {
cinfo = (SCompressInfo){
.cmprAlg = hdr->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = sizeof(int64_t) * bData->nRow,
};
code = tCompressDataToBuffer(bData->aUid, cinfo.originalSize, &cinfo, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
hdr->szUid = cinfo.compressedSize;
}
tBlockDataReset(bData);
// version
cinfo = (SCompressInfo){
.cmprAlg = hdr->cmprAlg,
.dataType = TSDB_DATA_TYPE_BIGINT,
.originalSize = sizeof(int64_t) * bData->nRow,
};
code = tCompressDataToBuffer((uint8_t *)bData->aVersion, cinfo.originalSize, &cinfo, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
hdr->szVer = cinfo.compressedSize;
// ts
cinfo = (SCompressInfo){
.cmprAlg = hdr->cmprAlg,
.dataType = TSDB_DATA_TYPE_TIMESTAMP,
.originalSize = sizeof(TSKEY) * bData->nRow,
};
code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, cinfo.originalSize, &cinfo, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
hdr->szKey = cinfo.compressedSize;
// primary keys
for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) {
ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS);
SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs];
SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs);
if ((colData->cflag & COL_IS_KEY) == 0) {
break;
}
SColDataCompressInfo info = {
.cmprAlg = hdr->cmprAlg,
};
code = tColDataCompress(colData, &info, buffer, assist);
TSDB_CHECK_CODE(code, lino, _exit);
*blockCol = (SBlockCol){
.cid = info.columnId,
.type = info.dataType,
.cflag = info.columnFlag,
.flag = info.flag,
.szOrigin = info.dataOriginalSize,
.szBitmap = info.bitmapCompressedSize,
.szOffset = info.offsetCompressedSize,
.szValue = info.dataCompressedSize,
.offset = 0,
};
}
_exit:
return code;
}
/* buffers[0]: SDiskDataHdr
* buffers[1]: key part: uid + version + ts + primary keys
* buffers[2]: SBlockCol part
* buffers[3]: regular column part
*/
int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist) {
int32_t code = 0;
int32_t lino = 0;
SDiskDataHdr hdr = {
.delimiter = TSDB_FILE_DLMT,
.fmtVer = 1,
.suid = bData->suid,
.uid = bData->uid,
.szUid = 0, // filled by compress key
.szVer = 0, // filled by compress key
.szKey = 0, // filled by compress key
.szBlkCol = 0, // filled by this func
.nRow = bData->nRow,
.cmprAlg = cmprAlg,
.numOfPKs = 0, // filled by compress key
};
// Key part
tBufferClear(&buffers[1]);
code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist);
TSDB_CHECK_CODE(code, lino, _exit);
// Regulart column part
tBufferClear(&buffers[2]);
tBufferClear(&buffers[3]);
for (int i = 0; i < bData->nColData; i++) {
SColData *colData = tBlockDataGetColDataByIdx(bData, i);
if (colData->cflag & COL_IS_KEY) {
continue;
}
if (colData->flag == HAS_NONE) {
continue;
}
SColDataCompressInfo cinfo = {
.cmprAlg = cmprAlg,
};
int32_t offset = buffers[3].size;
code = tColDataCompress(colData, &cinfo, &buffers[3], assist);
TSDB_CHECK_CODE(code, lino, _exit);
SBlockCol blockCol = (SBlockCol){
.cid = cinfo.columnId,
.type = cinfo.dataType,
.cflag = cinfo.columnFlag,
.flag = cinfo.flag,
.szOrigin = cinfo.dataOriginalSize,
.szBitmap = cinfo.bitmapCompressedSize,
.szOffset = cinfo.offsetCompressedSize,
.szValue = cinfo.dataCompressedSize,
.offset = offset,
};
code = tPutBlockCol(&buffers[2], &blockCol);
TSDB_CHECK_CODE(code, lino, _exit);
}
hdr.szBlkCol = buffers[2].size;
// SDiskDataHdr part
tBufferClear(&buffers[0]);
code = tPutDiskDataHdr(&buffers[0], &hdr);
TSDB_CHECK_CODE(code, lino, _exit);
_exit:
return code;
@ -1700,18 +1831,14 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br,
br->offset += cinfo.compressedSize;
// primary keys
if (hdr->numOfPKs > 0) {
SBlockCol blockCol;
for (int i = 0; i < hdr->numOfPKs; i++) {
const SBlockCol *blockCol = &hdr->primaryBlockCols[i];
for (int i = 0; i < hdr->numOfPKs; i++) {
br->offset += tGetBlockCol((uint8_t *)BR_PTR(br), &blockCol);
ASSERT(blockCol->flag == HAS_VALUE);
ASSERT(blockCol->cflag & COL_IS_KEY);
ASSERT(blockCol.flag == HAS_VALUE);
ASSERT(blockCol.cflag & COL_IS_KEY);
code = tBlockDataDecompressColData(hdr, &blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
}
code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
@ -1725,7 +1852,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *
SCompressInfo cinfo;
// SDiskDataHdr
br->offset += tGetDiskDataHdr((uint8_t *)BR_PTR(br), &hdr);
code = tGetDiskDataHdr(br, &hdr);
TSDB_CHECK_CODE(code, lino, _exit);
tBlockDataReset(blockData);
blockData->suid = hdr.suid;
@ -1737,15 +1865,13 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *
TSDB_CHECK_CODE(code, lino, _exit);
// Column part
uint8_t *decodePtr = (uint8_t *)BR_PTR(br);
int32_t totalSize = 0;
SBufferReader br2 = *br;
br->offset += hdr.szBlkCol;
while (totalSize < hdr.szBlkCol) {
for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) {
SBlockCol blockCol;
int32_t size = tGetBlockCol(decodePtr, &blockCol);
decodePtr += size;
totalSize += size;
code = tGetBlockCol(&br2, &blockCol);
TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1755,72 +1881,77 @@ _exit:
}
// SDiskDataHdr ==============================
int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) {
int32_t n = 0;
int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) {
int32_t code;
n += tPutU32(p ? p + n : p, pHdr->delimiter);
n += tPutU32v(p ? p + n : p, pHdr->fmtVer);
n += tPutI64(p ? p + n : p, pHdr->suid);
n += tPutI64(p ? p + n : p, pHdr->uid);
n += tPutI32v(p ? p + n : p, pHdr->szUid);
n += tPutI32v(p ? p + n : p, pHdr->szVer);
n += tPutI32v(p ? p + n : p, pHdr->szKey);
n += tPutI32v(p ? p + n : p, pHdr->szBlkCol);
n += tPutI32v(p ? p + n : p, pHdr->nRow);
n += tPutI8(p ? p + n : p, pHdr->cmprAlg);
if ((code = tBufferPutU32(buffer, pHdr->delimiter))) return code;
if ((code = tBufferPutU32v(buffer, pHdr->fmtVer))) return code;
if ((code = tBufferPutI64(buffer, pHdr->suid))) return code;
if ((code = tBufferPutI64(buffer, pHdr->uid))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->szUid))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->szVer))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code;
if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code;
if (pHdr->fmtVer == 1) {
n += tPutI8(p ? p + n : p, pHdr->numOfPKs);
if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) {
if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i]))) return code;
}
}
return n;
return 0;
}
int32_t tGetDiskDataHdr(uint8_t *p, void *ph) {
int32_t n = 0;
SDiskDataHdr *pHdr = (SDiskDataHdr *)ph;
int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) {
int32_t code;
n += tGetU32(p + n, &pHdr->delimiter);
n += tGetU32v(p + n, &pHdr->fmtVer);
n += tGetI64(p + n, &pHdr->suid);
n += tGetI64(p + n, &pHdr->uid);
n += tGetI32v(p + n, &pHdr->szUid);
n += tGetI32v(p + n, &pHdr->szVer);
n += tGetI32v(p + n, &pHdr->szKey);
n += tGetI32v(p + n, &pHdr->szBlkCol);
n += tGetI32v(p + n, &pHdr->nRow);
n += tGetI8(p + n, &pHdr->cmprAlg);
if ((code = tBufferGetU32(br, &pHdr->delimiter))) return code;
if ((code = tBufferGetU32v(br, &pHdr->fmtVer))) return code;
if ((code = tBufferGetI64(br, &pHdr->suid))) return code;
if ((code = tBufferGetI64(br, &pHdr->uid))) return code;
if ((code = tBufferGetI32v(br, &pHdr->szUid))) return code;
if ((code = tBufferGetI32v(br, &pHdr->szVer))) return code;
if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code;
if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code;
if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code;
if ((code = tBufferGetI8(br, &pHdr->cmprAlg))) return code;
if (pHdr->fmtVer == 1) {
n += tGetI8(p + n, &pHdr->numOfPKs);
if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) {
if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i]))) return code;
}
} else {
pHdr->numOfPKs = 0;
}
return n;
return 0;
}
// ALGORITHM ==============================
int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) {
int32_t n = 0;
int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg) {
int32_t code;
n += tPutI16v(p ? p + n : p, pColAgg->colId);
n += tPutI16v(p ? p + n : p, pColAgg->numOfNull);
n += tPutI64(p ? p + n : p, pColAgg->sum);
n += tPutI64(p ? p + n : p, pColAgg->max);
n += tPutI64(p ? p + n : p, pColAgg->min);
if ((code = tBufferPutI16v(buffer, pColAgg->colId))) return code;
if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code;
if ((code = tBufferPutI64(buffer, pColAgg->sum))) return code;
if ((code = tBufferPutI64(buffer, pColAgg->max))) return code;
if ((code = tBufferPutI64(buffer, pColAgg->min))) return code;
return n;
return 0;
}
int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) {
int32_t n = 0;
int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) {
int32_t code;
n += tGetI16v(p + n, &pColAgg->colId);
n += tGetI16v(p + n, &pColAgg->numOfNull);
n += tGetI64(p + n, &pColAgg->sum);
n += tGetI64(p + n, &pColAgg->max);
n += tGetI64(p + n, &pColAgg->min);
if ((code = tBufferGetI16v(br, &pColAgg->colId))) return code;
if ((code = tBufferGetI16v(br, &pColAgg->numOfNull))) return code;
if ((code = tBufferGetI64(br, &pColAgg->sum))) return code;
if ((code = tBufferGetI64(br, &pColAgg->max))) return code;
if ((code = tBufferGetI64(br, &pColAgg->min))) return code;
return n;
return 0;
}
int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut,

View File

@ -238,6 +238,12 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
// if the number of primary keys are not the same,
// return an error code and the caller should handle it
return TSDB_CODE_INVALID_PARA;
} else {
for (int i = 0; i < brinBlock->numOfPKs; i++) {
if (record->firstKey.key.pks[i].type != brinBlock->firstKeyPKs[i].type) {
return TSDB_CODE_INVALID_PARA;
}
}
}
code = tBufferPutI64(&brinBlock->suids, record->suid);
@ -387,89 +393,6 @@ int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) {
return 0;
}
// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) {
// int32_t code;
// SBuffer *helperBuffer = NULL; // TODO
// brinBlk->dp[0].size = 0;
// brinBlk->numRec = brinBlock->numOfRecords;
// brinBlk->numOfPKs = brinBlock->numOfPKs;
// // minTbid
// code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid);
// if (code) return code;
// code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid);
// if (code) return code;
// // maxTbid
// code =
// tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid),
// &brinBlk->maxTbid.suid);
// if (code) return code;
// code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid),
// &brinBlk->maxTbid.uid); if (code) return code;
// // minVer and maxVer
// const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers);
// const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers);
// brinBlk->minVer = minVers[0];
// brinBlk->maxVer = maxVers[0];
// for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) {
// if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i];
// if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i];
// }
// // compress data
// for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
// SBuffer *bf = &brinBlock->buffers[i];
// SCompressInfo info = {
// .cmprAlg = brinBlk->cmprAlg,
// };
// if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) {
// info.dataType = TSDB_DATA_TYPE_BIGINT;
// } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) {
// info.dataType = TSDB_DATA_TYPE_INT;
// } else {
// ASSERT(0);
// }
// code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer);
// if (code) return code;
// brinBlk->size[i] = info.compressedSize;
// brinBlk->dp[0].size += info.compressedSize;
// }
// // encode primary keys
// SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PK_COLS];
// SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PK_COLS];
// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
// SValueColumn *vc = &brinBlock->firstKeyPKs[i];
// firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
// code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer);
// if (code) return code;
// }
// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
// SValueColumn *vc = &brinBlock->lastKeyPKs[i];
// lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
// code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer);
// if (code) return code;
// }
// return 0;
// }
// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
// if (brinBlk->fmtVersion == 0) {
// return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock);
// } else if (brinBlk->fmtVersion == 1) {
// return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock);
// } else {
// ASSERT(0);
// }
// return 0;
// }
// other apis ----------
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb) {
if (tbid->suid) {

View File

@ -183,8 +183,6 @@ int32_t tBrinBlockDestroy(SBrinBlock *brinBlock);
int32_t tBrinBlockClear(SBrinBlock *brinBlock);
int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record);
int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record);
// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer);
// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock);
// other apis
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);