more code

This commit is contained in:
Hongze Cheng 2024-02-23 14:44:46 +08:00
parent 665cc2c2c4
commit aaa36b580e
13 changed files with 686 additions and 240 deletions

View File

@ -95,13 +95,26 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111
#define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE)
// SValueColumn ================================
typedef struct {
int8_t cmprAlg; // filled by caller
int8_t type; // filled by compress
int32_t originalDataSize; // filled by compress
int32_t compressedDataSize; // filled by compress
int32_t originalOffsetSize; // filled by compress
int32_t compressedOffsetSize; // filled by compress
} SValueColumnCompressInfo;
int32_t tValueColumnInit(SValueColumn *valCol);
int32_t tValueColumnDestroy(SValueColumn *valCol);
int32_t tValueColumnClear(SValueColumn *valCol);
int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value);
int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value);
int32_t tValueColumnCompress(SValueColumn *valCol, SBuffer *buffer);
int32_t tValueColumnDecompress(SBuffer *buffer, SValueColumn *valCol);
int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer,
SBuffer *helperBuffer);
int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo,
SValueColumn *valCol, SBuffer *helperBuffer);
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer);
int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo);
// SRow ================================
int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow);
@ -298,6 +311,18 @@ struct SValueColumn {
SBuffer offsets;
};
typedef struct {
int8_t dataType; // fill by caller
int8_t cmprAlg; // fill by caller
int32_t originalSize; // fill by compress
int32_t compressedSize; // fill by compress
} SCompressInfo;
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer,
SBuffer *helperBuffer);
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer,
SBuffer *helperBuffer);
#endif
#ifdef __cplusplus

View File

@ -33,9 +33,12 @@ static int32_t tBufferDestroy(SBuffer *buffer);
static int32_t tBufferClear(SBuffer *buffer);
static int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capacity);
static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size);
#define tBufferGetSize(buffer) ((buffer)->size)
#define tBufferGetCapacity(buffer) ((buffer)->capacity)
#define tBufferGetData(buffer) ((buffer)->data)
static int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data);
#define tBufferGetSize(buffer) ((buffer)->size)
#define tBufferGetCapacity(buffer) ((buffer)->capacity)
#define tBufferGetData(buffer) ((buffer)->data)
#define tBufferGetDataAt(buffer, idx) ((char *)(buffer)->data + (idx))
#define tBufferGetDataEnd(buffer) ((char *)(buffer)->data + (buffer)->size)
// SBufferWriter
#define BUFFER_WRITER_INITIALIZER(forward, offset, buffer) ((SBufferWriter){forward, offset, buffer})

View File

@ -73,7 +73,7 @@ static FORCE_INLINE int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capa
return 0;
}
static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) {
static FORCE_INLINE int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) {
int32_t code = tBufferEnsureCapacity(buffer, buffer->size + size);
if (code) return code;
memcpy((char *)buffer->data + buffer->size, data, size);
@ -81,6 +81,14 @@ static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) {
return 0;
}
static FORCE_INLINE int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data) {
if (index < 0 || (index + 1) * size > buffer->size) {
return TSDB_CODE_OUT_OF_RANGE;
}
memcpy(data, (char *)buffer->data + index * size, size);
return 0;
}
// SBufferWriter
static int32_t tBufferWriterInit(SBufferWriter *writer, bool forward, uint32_t offset, SBuffer *buffer) {
writer->forward = forward;

View File

@ -3860,14 +3860,214 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) {
return 0;
}
int32_t tValueColumnCompress(SValueColumn *valCol, SBuffer *buffer) {
// TODO
ASSERT(0);
int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer,
SBuffer *helperBuffer) {
int32_t code;
SCompressInfo info;
compressInfo->type = valCol->type;
if (IS_VAR_DATA_TYPE(valCol->type)) {
info.dataType = TSDB_DATA_TYPE_INT;
info.cmprAlg = compressInfo->cmprAlg;
code =
tCompressData(tBufferGetData(&valCol->offsets), tBufferGetSize(&valCol->offsets), &info, buffer, helperBuffer);
if (code) return code;
compressInfo->originalOffsetSize = info.originalSize;
compressInfo->compressedOffsetSize = info.compressedSize;
} else {
compressInfo->originalOffsetSize = 0;
compressInfo->compressedOffsetSize = 0;
}
info.dataType = valCol->type;
info.cmprAlg = compressInfo->cmprAlg;
code = tCompressData(tBufferGetData(&valCol->data), tBufferGetSize(&valCol->data), &info, buffer, helperBuffer);
if (code) return code;
compressInfo->originalDataSize = info.originalSize;
compressInfo->compressedDataSize = info.compressedSize;
return 0;
}
int32_t tValueColumnDecompress(SBuffer *buffer, SValueColumn *valCol) {
ASSERT(0);
// TODO
int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo,
SValueColumn *valCol, SBuffer *helperBuffer) {
int32_t code;
SCompressInfo info;
tValueColumnClear(valCol);
valCol->type = compressInfo->type;
if (IS_VAR_DATA_TYPE(valCol->type)) {
ASSERT(inputSize == compressInfo->compressedOffsetSize + compressInfo->compressedDataSize);
info.dataType = TSDB_DATA_TYPE_INT;
info.cmprAlg = compressInfo->cmprAlg;
info.originalSize = compressInfo->originalOffsetSize;
info.compressedSize = compressInfo->compressedOffsetSize;
code = tDecompressData(input, compressInfo->compressedOffsetSize, &info, &valCol->offsets, helperBuffer);
if (code) return code;
valCol->numOfValues = compressInfo->originalOffsetSize / tDataTypes[TSDB_DATA_TYPE_INT].bytes;
} else {
ASSERT(inputSize == compressInfo->compressedDataSize);
valCol->numOfValues = compressInfo->originalDataSize / tDataTypes[valCol->type].bytes;
}
info.dataType = valCol->type;
info.cmprAlg = compressInfo->cmprAlg;
info.originalSize = compressInfo->originalDataSize;
info.compressedSize = compressInfo->compressedDataSize;
code = tDecompressData((char *)input + compressInfo->compressedOffsetSize, compressInfo->compressedDataSize, &info,
&valCol->data, helperBuffer);
if (code) return code;
return 0;
}
int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer) {
int32_t code;
uint8_t formatVersion = 0;
// format version
code = tBufferPutU8(writer, formatVersion);
if (code) return code;
// struct info
code = tBufferPutI8(writer, compressInfo->cmprAlg);
if (code) return code;
code = tBufferPutI8(writer, compressInfo->type);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->originalDataSize);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->compressedDataSize);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->originalOffsetSize);
if (code) return code;
code = tBufferPutI32v(writer, compressInfo->compressedOffsetSize);
if (code) return code;
return 0;
}
int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo) {
int32_t code;
uint8_t formatVersion;
// format version
code = tBufferGetU8(reader, &formatVersion);
if (code) return code;
if (formatVersion == 0) {
code = tBufferGetI8(reader, &compressInfo->cmprAlg);
if (code) return code;
code = tBufferGetI8(reader, &compressInfo->type);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->originalDataSize);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->compressedDataSize);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->originalOffsetSize);
if (code) return code;
code = tBufferGetI32v(reader, &compressInfo->compressedOffsetSize);
if (code) return code;
} else {
return TSDB_CODE_INVALID_DATA_FMT;
}
return 0;
}
int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer,
SBuffer *helperBuffer) {
cmprInfo->originalSize = inputSize;
if (cmprInfo->cmprAlg == NO_COMPRESSION) {
cmprInfo->compressedSize = inputSize;
return tBufferAppend(buffer, input, inputSize);
} else {
SBuffer hBuffer;
SBuffer *extraBuffer = helperBuffer;
int32_t extraSizeNeeded = inputSize + COMP_OVERFLOW_BYTES;
tBufferInit(&hBuffer);
int32_t code = tBufferEnsureCapacity(buffer, buffer->size + extraSizeNeeded);
if (code) return code;
if (cmprInfo->cmprAlg == TWO_STAGE_COMP) {
if (extraBuffer == NULL) {
extraBuffer = &hBuffer;
}
code = tBufferEnsureCapacity(extraBuffer, extraSizeNeeded);
if (code) return code;
}
cmprInfo->compressedSize = tDataTypes[cmprInfo->dataType].compFunc( //
(void *)input, // input
inputSize, // input size
inputSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output
extraSizeNeeded, // output size
cmprInfo->cmprAlg, // compression algorithm
tBufferGetData(extraBuffer), // helper buffer
extraSizeNeeded // extra buffer size
);
if (cmprInfo->compressedSize < 0) {
tBufferDestroy(&hBuffer);
return TSDB_CODE_COMPRESS_ERROR;
}
buffer->size += cmprInfo->compressedSize;
tBufferDestroy(&hBuffer);
}
return 0;
}
int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer,
SBuffer *helperBuffer) {
if (cmprInfo->cmprAlg == NO_COMPRESSION) {
ASSERT(inputSize == cmprInfo->originalSize);
return tBufferAppend(buffer, input, inputSize);
} else {
SBuffer hBuffer;
SBuffer *extraBuffer = helperBuffer;
int32_t code;
tBufferInit(&hBuffer);
code = tBufferEnsureCapacity(buffer, cmprInfo->originalSize);
if (code) return code;
if (cmprInfo->cmprAlg == TWO_STAGE_COMP) {
if (extraBuffer == NULL) {
extraBuffer = &hBuffer;
}
code = tBufferEnsureCapacity(extraBuffer, cmprInfo->originalSize + COMP_OVERFLOW_BYTES);
if (code) return code;
}
int32_t decompressedSize = tDataTypes[cmprInfo->dataType].decompFunc(
(void *)input, // input
inputSize, // inputSize
cmprInfo->originalSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements
tBufferGetDataEnd(buffer), // output
cmprInfo->originalSize, // output size
cmprInfo->cmprAlg, // compression algorithm
extraBuffer, // helper buffer
cmprInfo->originalSize + COMP_OVERFLOW_BYTES // extra buffer size
);
if (decompressedSize < 0) {
tBufferDestroy(&hBuffer);
return TSDB_CODE_COMPRESS_ERROR;
}
ASSERT(decompressedSize == cmprInfo->originalSize);
buffer->size += decompressedSize;
tBufferDestroy(&hBuffer);
}
return 0;
}

View File

@ -94,7 +94,7 @@ typedef struct STsdbRowKey STsdbRowKey;
#define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM))
#define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \
((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE))
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE) * PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE))
#define PAGE_OFFSET(PGNO, PAGE) (((PGNO)-1) * (PAGE))
#define OFFSET_PGNO(OFFSET, PAGE) ((OFFSET) / (PAGE) + 1)
@ -283,7 +283,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx);
// tsdbRead.c ==============================================================================================
int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap);
void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive);
int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid);
int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_t *suid);
// tsdbMerge.c ==============================================================================================
typedef struct {

View File

@ -2550,7 +2550,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie
goto _err;
}
state->iBrinRecord = BRIN_BLOCK_SIZE(&state->brinBlock) - 1;
state->iBrinRecord = state->brinBlock.numOfRecords - 1;
state->state = SFSNEXTROW_BRINBLOCK;
}

View File

@ -195,6 +195,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size, 0);
TSDB_CHECK_CODE(code, lino, _exit);
#if 0
int32_t size = 0;
tBrinBlockClear(brinBlock);
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
@ -218,6 +219,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB
size += brinBlk->size[j];
}
#endif
_exit:
if (code) {
@ -801,82 +803,109 @@ int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxV
int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize,
TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range) {
if (BRIN_BLOCK_SIZE(brinBlock) == 0) return 0;
if (brinBlock->numOfRecords == 0) return 0;
int32_t code;
// get SBrinBlk
SBrinBlk brinBlk[1] = {
{
.dp[0] =
{
.offset = *fileSize,
.size = 0,
},
.minTbid =
{
.suid = TARRAY2_FIRST(brinBlock->suid),
.uid = TARRAY2_FIRST(brinBlock->uid),
},
.maxTbid =
{
.suid = TARRAY2_LAST(brinBlock->suid),
.uid = TARRAY2_LAST(brinBlock->uid),
},
.minVer = TARRAY2_FIRST(brinBlock->minVer),
.maxVer = TARRAY2_FIRST(brinBlock->minVer),
.numRec = BRIN_BLOCK_SIZE(brinBlock),
.cmprAlg = cmprAlg,
},
SBrinBlk brinBlk = {
.numRec = brinBlock->numOfRecords,
.numOfPKs = brinBlock->numOfPKs,
.cmprAlg = cmprAlg,
};
for (int32_t i = 1; i < BRIN_BLOCK_SIZE(brinBlock); i++) {
if (brinBlk->minVer > TARRAY2_GET(brinBlock->minVer, i)) {
brinBlk->minVer = TARRAY2_GET(brinBlock->minVer, i);
}
if (brinBlk->maxVer < TARRAY2_GET(brinBlock->maxVer, i)) {
brinBlk->maxVer = TARRAY2_GET(brinBlock->maxVer, i);
}
brinBlk.dp->offset = *fileSize;
brinBlk.dp->size = 0;
// minTbid
code = tBufferGet(&brinBlock->suids, 0, sizeof(int64_t), &brinBlk.minTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, 0, sizeof(int64_t), &brinBlk.minTbid.uid);
if (code) return code;
// maxTbid
code = tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.uid);
if (code) return code;
// minVer and maxVer
const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers);
const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers);
brinBlk.minVer = minVers[0];
brinBlk.maxVer = maxVers[0];
for (int32_t i = 1; i < brinBlock->numOfRecords; i++) {
brinBlk.minVer = TMIN(brinBlk.minVer, minVers[i]);
brinBlk.maxVer = TMAX(brinBlk.maxVer, maxVers[i]);
}
tsdbWriterUpdVerRange(range, brinBlk->minVer, brinBlk->maxVer);
tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer);
// write to file
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(brinBlock->dataArr1 + i),
TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[i], &bufArr[1]);
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
SBuffer *bf = &brinBlock->buffers[i];
SCompressInfo info = {
.cmprAlg = cmprAlg,
};
if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) {
info.dataType = TSDB_DATA_TYPE_BIGINT;
} else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) {
info.dataType = TSDB_DATA_TYPE_INT;
} else {
ASSERT(0);
}
tBufferClear(NULL /* TODO */);
code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, NULL /* TODO */, NULL /* TODO*/);
if (code) return code;
code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[i]);
code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, info.compressedSize);
if (code) return code;
brinBlk->dp->size += brinBlk->size[i];
*fileSize += brinBlk->size[i];
brinBlk.size[i] = info.compressedSize;
brinBlk.dp->size += info.compressedSize;
*fileSize += info.compressedSize;
}
for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) {
code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr2 + i), TARRAY2_DATA_LEN(brinBlock->dataArr2 + i),
TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[j], &bufArr[1]);
if (code) return code;
// write primary keys to file
if (brinBlock->numOfPKs > 0) {
SBufferWriter writer;
SValueColumnCompressInfo vcinfo = {.cmprAlg = cmprAlg};
code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[j]);
if (code) return code;
tBufferClear(NULL);
tBufferWriterInit(&writer, true, 0, NULL /* TODO */);
brinBlk->dp->size += brinBlk->size[j];
*fileSize += brinBlk->size[j];
for (int32_t i = 0; i < brinBlk.numOfPKs; i++) {
code = tValueColumnCompress(&brinBlock->firstKeyPKs[i], &vcinfo, NULL /* TODO */, NULL /* TODO */);
if (code) return code;
code = tValueColumnCompressInfoEncode(&vcinfo, &writer);
if (code) return code;
}
for (int32_t i = 0; i < brinBlk.numOfPKs; i++) {
code = tValueColumnCompress(&brinBlock->lastKeyPKs[i], &vcinfo, NULL /* TODO */, NULL /* TODO */);
if (code) return code;
code = tValueColumnCompressInfoEncode(&vcinfo, &writer);
if (code) return code;
}
// write to file
// TODO
ASSERT(0);
// code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, tBufferGetSize(NULL));
// if (code) return code;
// *fileSize += tBufferGetSize(NULL);
// brinBlk->dp->size += tBufferGetSize(NULL);
// code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, tBufferGetSize(NULL));
// if (code) return code;
// *fileSize += tBufferGetSize(NULL);
// brinBlk->dp->size += tBufferGetSize(NULL);
tBufferWriterDestroy(writer);
}
#if 0
SBrinRecord record;
for (int32_t i = 0; i < BRIN_BLOCK_SIZE(brinBlock); i++) {
tBrinBlockGet(brinBlock, i, &record);
tsdbInfo("write brin block, block num:%04d, idx:%04d suid:%ld, uid:%ld, offset:%ld, numRow:%d, count:%d",
TARRAY2_SIZE(brinBlkArray), i, record.suid, record.uid, record.blockOffset, record.numRow, record.count);
}
#endif
// append to brinBlkArray
code = TARRAY2_APPEND_PTR(brinBlkArray, brinBlk);
code = TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk);
if (code) return code;
tBrinBlockClear(brinBlock);
@ -885,7 +914,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl
}
static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) {
if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0;
if (writer->brinBlock->numOfRecords == 0) return 0;
int32_t code = 0;
int32_t lino = 0;
@ -909,7 +938,7 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR
code = tBrinBlockPut(writer->brinBlock, record);
TSDB_CHECK_CODE(code, lino, _exit);
if (BRIN_BLOCK_SIZE(writer->brinBlock) >= writer->config->maxRow) {
if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) {
code = tsdbDataFileWriteBrinBlock(writer);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -932,10 +961,6 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
SBrinRecord record[1] = {{
.suid = bData->suid,
.uid = bData->uid,
.firstKey = bData->aTSKEY[0],
.firstKeyVer = bData->aVersion[0],
.lastKey = bData->aTSKEY[bData->nRow - 1],
.lastKeyVer = bData->aVersion[bData->nRow - 1],
.minVer = bData->aVersion[0],
.maxVer = bData->aVersion[0],
.blockOffset = writer->files[TSDB_FTYPE_DATA].size,
@ -947,6 +972,9 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData
.count = 1,
}};
tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey);
tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey);
for (int32_t i = 1; i < bData->nRow; ++i) {
if (bData->aTSKEY[i] != bData->aTSKEY[i - 1]) {
record->count++;
@ -1074,46 +1102,57 @@ _exit:
return code;
}
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TSDBKEY *key) {
static int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) {
if (key1 == NULL) {
return 1;
} else if (key2 == NULL) {
return -1;
} else {
return tsdbRowKeyCmpr(key1, key2);
}
}
static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) {
if (writer->ctx->tbHasOldData == false) return 0;
int32_t code = 0;
int32_t lino = 0;
int32_t code = 0;
int32_t lino = 0;
STsdbRowKey rowKey;
for (;;) {
for (;;) {
// SBlockData
for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) {
if (key->ts < writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] //
|| (key->ts == writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] &&
key->version < writer->ctx->blockData->aVersion[writer->ctx->blockDataIdx])) {
goto _exit;
} else {
TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx);
tsdbRowGetKey(&row, &rowKey);
if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) { // key <= rowKey
code = tsdbDataFileDoWriteTSRow(writer, &row);
TSDB_CHECK_CODE(code, lino, _exit);
} else {
goto _exit;
}
}
// SBrinBlock
if (writer->ctx->brinBlockIdx >= BRIN_BLOCK_SIZE(writer->ctx->brinBlock)) {
if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) {
break;
}
for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) {
if (TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx) != writer->ctx->tbid->uid) {
for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
SBrinRecord record;
tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
if (record.uid != writer->ctx->tbid->uid) {
writer->ctx->tbHasOldData = false;
goto _exit;
}
if (key->ts < TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) //
|| (key->ts == TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) &&
key->version < TARRAY2_GET(writer->ctx->brinBlock->firstKeyVer, writer->ctx->brinBlockIdx))) {
if (tsdbRowKeyCmpr(key, &record.firstKey) < 0) { // key < record->firstKey
goto _exit;
} else {
SBrinRecord record[1];
tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
if (key->ts > record->lastKey || (key->ts == record->lastKey && key->version > record->maxVer)) {
if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) { // key > record->lastKey
if (writer->blockData->nRow > 0) {
code = tsdbDataFileDoWriteBlockData(writer, writer->blockData);
TSDB_CHECK_CODE(code, lino, _exit);
@ -1166,16 +1205,10 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row)
int32_t lino = 0;
if (writer->ctx->tbHasOldData) {
TSDBKEY key[1];
if (row->type == TSDBROW_ROW_FMT) {
key->ts = row->pTSRow->ts;
key->version = row->version;
} else {
key->ts = row->pBlockData->aTSKEY[row->iRow];
key->version = row->pBlockData->aVersion[row->iRow];
}
STsdbRowKey key;
tsdbRowGetKey(row, &key);
code = tsdbDataFileDoWriteTableOldData(writer, key);
code = tsdbDataFileDoWriteTableOldData(writer, &key);
TSDB_CHECK_CODE(code, lino, _exit);
}
@ -1196,12 +1229,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) {
int32_t lino = 0;
if (writer->ctx->tbHasOldData) {
TSDBKEY key = {
.ts = TSKEY_MAX,
.version = VERSION_MAX,
};
code = tsdbDataFileDoWriteTableOldData(writer, &key);
code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as largest key */);
TSDB_CHECK_CODE(code, lino, _exit);
ASSERT(writer->ctx->tbHasOldData == false);
@ -1229,35 +1257,32 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA
TABLEID tbid1[1];
writer->ctx->tbHasOldData = false;
while (writer->ctx->brinBlkArray) { // skip data of previous table
for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) {
TABLEID tbid2[1] = {{
.suid = TARRAY2_GET(writer->ctx->brinBlock->suid, writer->ctx->brinBlockIdx),
.uid = TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx),
}};
for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) {
SBrinRecord record;
tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record);
if (tbid2->uid == tbid->uid) {
if (record.uid == tbid->uid) {
writer->ctx->tbHasOldData = true;
goto _begin;
} else if (tbid2->suid > tbid->suid || (tbid2->suid == tbid->suid && tbid2->uid > tbid->uid)) {
} else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) {
goto _begin;
} else {
if (tbid2->uid != writer->ctx->tbid->uid) {
if (drop && tbid1->uid == tbid2->uid) {
if (record.uid != writer->ctx->tbid->uid) {
if (drop && tbid1->uid == record.uid) {
continue;
} else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, tbid2->uid, &info, NULL) != 0) {
} else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) {
drop = true;
*tbid1 = *tbid2;
tbid1->suid = record.suid;
tbid1->uid = record.uid;
continue;
} else {
drop = false;
writer->ctx->tbid[0] = *tbid2;
writer->ctx->tbid->suid = record.suid;
writer->ctx->tbid->uid = record.uid;
}
}
SBrinRecord record[1];
tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record);
code = tsdbDataFileWriteBrinRecord(writer, record);
code = tsdbDataFileWriteBrinRecord(writer, &record);
TSDB_CHECK_CODE(code, lino, _exit);
}
}
@ -1836,12 +1861,7 @@ int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) {
}
if (writer->ctx->tbHasOldData) {
TSDBKEY key = {
.ts = bData->aTSKEY[0],
.version = bData->aVersion[0],
};
code = tsdbDataFileDoWriteTableOldData(writer, &key);
code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as largest key */);
TSDB_CHECK_CODE(code, lino, _exit);
}

View File

@ -147,12 +147,11 @@ static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) {
}
// SBrinBlock
if (iter->dataData->brinBlockIdx >= BRIN_BLOCK_SIZE(iter->dataData->brinBlock)) {
if (iter->dataData->brinBlockIdx >= iter->dataData->brinBlock->numOfRecords) {
break;
}
for (; iter->dataData->brinBlockIdx < BRIN_BLOCK_SIZE(iter->dataData->brinBlock);
iter->dataData->brinBlockIdx++) {
for (; iter->dataData->brinBlockIdx < iter->dataData->brinBlock->numOfRecords; iter->dataData->brinBlockIdx++) {
SBrinRecord record[1];
tBrinBlockGet(iter->dataData->brinBlock, iter->dataData->brinBlockIdx, record);

View File

@ -591,7 +591,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
}
// 1. time range check
if (pRecord->firstKey > w.ekey || pRecord->lastKey < w.skey) {
if (pRecord->firstKey.key.ts > w.ekey || pRecord->lastKey.key.ts < w.skey) {
continue;
}
@ -609,11 +609,11 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pScanInfo->filesetWindow.skey > pRecord->firstKey) {
pScanInfo->filesetWindow.skey = pRecord->firstKey;
if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) {
pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts;
}
if (pScanInfo->filesetWindow.ekey < pRecord->lastKey) {
pScanInfo->filesetWindow.ekey = pRecord->lastKey;
if (pScanInfo->filesetWindow.ekey < pRecord->lastKey.key.ts) {
pScanInfo->filesetWindow.ekey = pRecord->lastKey.key.ts;
}
pBlockNum->numOfBlocks += 1;
@ -744,9 +744,9 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData
int32_t endPos = -1;
bool asc = ASCENDING_TRAVERSE(pReader->info.order);
if (asc && pReader->info.window.ekey >= pRecord->lastKey) {
if (asc && pReader->info.window.ekey >= pRecord->lastKey.key.ts) {
endPos = pRecord->numRow - 1;
} else if (!asc && pReader->info.window.skey <= pRecord->firstKey) {
} else if (!asc && pReader->info.window.skey <= pRecord->firstKey.key.ts) {
endPos = 0;
} else {
int64_t key = asc ? pReader->info.window.ekey : pReader->info.window.skey;
@ -889,8 +889,12 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo
static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo) {
record->uid = pBlockInfo->uid;
record->firstKey = pBlockInfo->firstKey;
record->lastKey = pBlockInfo->lastKey;
record->firstKey = (STsdbRowKey){
.key = {.ts = pBlockInfo->firstKey, .numOfPKs = 0},
};
record->lastKey = (STsdbRowKey){
.key = {.ts = pBlockInfo->lastKey, .numOfPKs = 0},
};
record->minVer = pBlockInfo->minVer;
record->maxVer = pBlockInfo->maxVer;
record->blockOffset = pBlockInfo->blockOffset;
@ -933,9 +937,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
// row index of dump info remain the initial position, let's find the appropriate start position.
if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) {
if (asc && pReader->info.window.skey <= pRecord->firstKey && pReader->info.verRange.minVer <= pRecord->minVer) {
if (asc && pReader->info.window.skey <= pRecord->firstKey.key.ts &&
pReader->info.verRange.minVer <= pRecord->minVer) {
// pDumpInfo->rowIndex = 0;
} else if (!asc && pReader->info.window.ekey >= pRecord->lastKey &&
} else if (!asc && pReader->info.window.ekey >= pRecord->lastKey.key.ts &&
pReader->info.verRange.maxVer >= pRecord->maxVer) {
// pDumpInfo->rowIndex = pRecord->numRow - 1;
} else { // find the appropriate the start position in current block, and set it to be the current rowIndex
@ -948,8 +953,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
tsdbError(
"%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64
"-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->minVer,
pRecord->maxVer, pReader->idStr);
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey.key.ts, pRecord->lastKey.key.ts,
pRecord->minVer, pRecord->maxVer, pReader->idStr);
return TSDB_CODE_INVALID_PARA;
}
@ -1058,7 +1063,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
}
} else {
int64_t ts = asc ? pRecord->lastKey : pRecord->firstKey;
int64_t ts = asc ? pRecord->lastKey.key.ts : pRecord->firstKey.key.ts;
setBlockAllDumped(pDumpInfo, ts, pReader->info.order);
}
@ -1068,8 +1073,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) {
int32_t unDumpedRows = asc ? pRecord->numRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1;
tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, dumpedRows,
unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey.key.ts, pRecord->lastKey.key.ts,
dumpedRows, unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr);
return TSDB_CODE_SUCCESS;
}
@ -1131,8 +1136,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI
tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64
", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s",
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->numRow,
pRecord->minVer, pRecord->maxVer, elapsedTime, pReader->idStr);
pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey.key.ts, pRecord->lastKey.key.ts,
pRecord->numRow, pRecord->minVer, pRecord->maxVer, elapsedTime, pReader->idStr);
pReader->cost.blockLoadTime += elapsedTime;
pDumpInfo->allDumped = false;
@ -1235,9 +1240,9 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte
static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) {
// it is the last block in current file, no chance to overlap with neighbor blocks.
if (ASCENDING_TRAVERSE(order)) {
return pBlock->lastKey == pRec->firstKey;
return pBlock->lastKey == pRec->firstKey.key.ts;
} else {
return pBlock->firstKey == pRec->lastKey;
return pBlock->firstKey == pRec->lastKey.key.ts;
}
}

View File

@ -281,7 +281,7 @@ void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray
}
SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) {
if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= TARRAY2_SIZE(pIter->block.numRow)) {
if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= pIter->block.numOfRecords) {
pIter->blockIndex += 1;
if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) {
return NULL;
@ -358,8 +358,8 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v
static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record){
pBlockInfo->uid = record->uid;
pBlockInfo->firstKey = record->firstKey;
pBlockInfo->lastKey = record->lastKey;
pBlockInfo->firstKey = record->firstKey.key.ts;
pBlockInfo->lastKey = record->lastKey.key.ts;
pBlockInfo->minVer = record->minVer;
pBlockInfo->maxVer = record->maxVer;
pBlockInfo->blockOffset = record->blockOffset;
@ -962,15 +962,15 @@ static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const S
for (int32_t i = startIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) {
if (p->version >= pRecord->minVer) {
return true;
}
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
} else if (p->ts < pRecord->firstKey.key.ts) { // p->ts < pBlock->minKey.ts
if (p->version >= pRecord->minVer) {
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (pnext->ts >= pRecord->firstKey) {
if (pnext->ts >= pRecord->firstKey.key.ts) {
return true;
}
} else { // it must be the last point
@ -991,12 +991,12 @@ static bool doCheckDatablockOverlapWithoutVersion(STableBlockScanInfo* pBlockSca
for (int32_t i = startIndex; i < num; i += 1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i);
if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) {
if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) {
return true;
} else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts
} else if (p->ts < pRecord->firstKey.key.ts) { // p->ts < pBlock->minKey.ts
if (i < num - 1) {
TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1);
if (pnext->ts >= pRecord->firstKey) {
if (pnext->ts >= pRecord->firstKey.key.ts) {
return true;
}
}
@ -1016,7 +1016,7 @@ bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecor
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) {
return false;
}
@ -1027,10 +1027,10 @@ bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecor
int32_t index = pBlockScanInfo->fileDelIndex;
while (1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pRecord->firstKey && index > 0) {
if (p->ts > pRecord->firstKey.key.ts && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) {
if (p->ts == pRecord->firstKey.key.ts && p->version < pRecord->maxVer && index > 0) {
index -= 1;
}
break;
@ -1049,7 +1049,7 @@ bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const
// ts is not overlap
TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0);
TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline);
if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) {
if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) {
return false;
}
@ -1060,10 +1060,10 @@ bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const
int32_t index = pBlockScanInfo->fileDelIndex;
while (1) {
TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index);
if (p->ts > pRecord->firstKey && index > 0) {
if (p->ts > pRecord->firstKey.key.ts && index > 0) {
index -= 1;
} else { // find the first point that is smaller than the minKey.ts of dataBlock.
if (p->ts == pRecord->firstKey && index > 0) {
if (p->ts == pRecord->firstKey.key.ts && index > 0) {
index -= 1;
}
break;

View File

@ -97,10 +97,24 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
SBrinRecord record = {
.suid = pBlockIdx->suid,
.uid = pBlockIdx->uid,
.firstKey = dataBlk->minKey.ts,
.firstKeyVer = dataBlk->minKey.version,
.lastKey = dataBlk->maxKey.ts,
.lastKeyVer = dataBlk->maxKey.version,
.firstKey =
(STsdbRowKey){
.key =
(SRowKey){
.ts = dataBlk->minKey.ts,
.numOfPKs = 0,
},
.version = dataBlk->minKey.version,
},
.lastKey =
(STsdbRowKey){
.key =
(SRowKey){
.ts = dataBlk->maxKey.ts,
.numOfPKs = 0,
},
.version = dataBlk->maxKey.version,
},
.minVer = dataBlk->minVer,
.maxVer = dataBlk->maxVer,
.blockOffset = dataBlk->aSubBlock->offset,
@ -119,7 +133,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
code = tBrinBlockPut(ctx->brinBlock, &record);
TSDB_CHECK_CODE(code, lino, _exit);
if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) {
if (ctx->brinBlock->numOfRecords >= ctx->maxRow) {
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
ctx->brinBlkArray, ctx->bufArr, &range);
@ -128,7 +142,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
}
}
if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) {
if (ctx->brinBlock->numOfRecords > 0) {
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
ctx->brinBlkArray, ctx->bufArr, &range);

View File

@ -105,56 +105,230 @@ int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecor
// SBrinRecord ----------
int32_t tBrinBlockInit(SBrinBlock *brinBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) {
TARRAY2_INIT(&brinBlock->dataArr1[i]);
brinBlock->numOfPKs = 0;
brinBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
tBufferInit(&brinBlock->buffers[i]);
}
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) {
TARRAY2_INIT(&brinBlock->dataArr2[i]);
for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) {
tValueColumnInit(&brinBlock->firstKeyPKs[i]);
tValueColumnInit(&brinBlock->lastKeyPKs[i]);
}
return 0;
}
int32_t tBrinBlockDestroy(SBrinBlock *brinBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) {
TARRAY2_DESTROY(&brinBlock->dataArr1[i], NULL);
brinBlock->numOfPKs = 0;
brinBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
tBufferDestroy(&brinBlock->buffers[i]);
}
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) {
TARRAY2_DESTROY(&brinBlock->dataArr2[i], NULL);
for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) {
tValueColumnDestroy(&brinBlock->firstKeyPKs[i]);
tValueColumnDestroy(&brinBlock->lastKeyPKs[i]);
}
return 0;
}
int32_t tBrinBlockClear(SBrinBlock *brinBlock) {
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) {
TARRAY2_CLEAR(&brinBlock->dataArr1[i], NULL);
brinBlock->numOfPKs = 0;
brinBlock->numOfRecords = 0;
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
tBufferClear(&brinBlock->buffers[i]);
}
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) {
TARRAY2_CLEAR(&brinBlock->dataArr2[i], NULL);
for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) {
tValueColumnClear(&brinBlock->firstKeyPKs[i]);
tValueColumnClear(&brinBlock->lastKeyPKs[i]);
}
return 0;
}
int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
int32_t code;
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) {
code = TARRAY2_APPEND(&brinBlock->dataArr1[i], record->dataArr1[i]);
ASSERT(record->firstKey.key.numOfPKs == record->lastKey.key.numOfPKs);
if (brinBlock->numOfRecords == 0) {
brinBlock->numOfPKs = record->firstKey.key.numOfPKs;
}
ASSERT(brinBlock->numOfPKs == record->firstKey.key.numOfPKs);
code = tBufferAppend(&brinBlock->suids, &record->suid, sizeof(record->suid));
if (code) return code;
code = tBufferAppend(&brinBlock->uids, &record->uid, sizeof(record->uid));
if (code) return code;
code = tBufferAppend(&brinBlock->firstKeyTimestamps, &record->firstKey.key.ts, sizeof(record->firstKey.key.ts));
if (code) return code;
code = tBufferAppend(&brinBlock->firstKeyVersions, &record->firstKey.version, sizeof(record->firstKey.version));
if (code) return code;
for (int32_t i = 0; i < record->firstKey.key.numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]);
if (code) return code;
}
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) {
code = TARRAY2_APPEND(&brinBlock->dataArr2[i], record->dataArr2[i]);
code = tBufferAppend(&brinBlock->lastKeyTimestamps, &record->lastKey.key.ts, sizeof(record->lastKey.key.ts));
if (code) return code;
code = tBufferAppend(&brinBlock->lastKeyVersions, &record->lastKey.version, sizeof(record->lastKey.version));
if (code) return code;
for (int32_t i = 0; i < record->lastKey.key.numOfPKs; ++i) {
code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]);
if (code) return code;
}
code = tBufferAppend(&brinBlock->minVers, &record->minVer, sizeof(record->minVer));
if (code) return code;
code = tBufferAppend(&brinBlock->maxVers, &record->maxVer, sizeof(record->maxVer));
if (code) return code;
code = tBufferAppend(&brinBlock->blockOffsets, &record->blockOffset, sizeof(record->blockOffset));
if (code) return code;
code = tBufferAppend(&brinBlock->smaOffsets, &record->smaOffset, sizeof(record->smaOffset));
if (code) return code;
code = tBufferAppend(&brinBlock->blockSizes, &record->blockSize, sizeof(record->blockSize));
if (code) return code;
code = tBufferAppend(&brinBlock->blockKeySizes, &record->blockKeySize, sizeof(record->blockKeySize));
if (code) return code;
code = tBufferAppend(&brinBlock->smaSizes, &record->smaSize, sizeof(record->smaSize));
if (code) return code;
code = tBufferAppend(&brinBlock->numRows, &record->numRow, sizeof(record->numRow));
if (code) return code;
code = tBufferAppend(&brinBlock->counts, &record->count, sizeof(record->count));
if (code) return code;
brinBlock->numOfRecords++;
return 0;
}
int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) {
if (idx >= BRIN_BLOCK_SIZE(brinBlock)) return TSDB_CODE_OUT_OF_RANGE;
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) {
record->dataArr1[i] = TARRAY2_GET(&brinBlock->dataArr1[i], idx);
int32_t code;
if (idx < 0 || idx >= brinBlock->numOfRecords) {
return TSDB_CODE_OUT_OF_RANGE;
}
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) {
record->dataArr2[i] = TARRAY2_GET(&brinBlock->dataArr2[i], idx);
code = tBufferGet(&brinBlock->suids, idx, sizeof(record->suid), &record->suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, idx, sizeof(record->uid), &record->uid);
if (code) return code;
code = tBufferGet(&brinBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.key.ts), &record->firstKey.key.ts);
if (code) return code;
code = tBufferGet(&brinBlock->firstKeyVersions, idx, sizeof(record->firstKey.version), &record->firstKey.version);
if (code) return code;
for (record->firstKey.key.numOfPKs = 0; record->firstKey.key.numOfPKs < brinBlock->numOfPKs;
record->firstKey.key.numOfPKs++) {
code = tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx,
&record->firstKey.key.pks[record->firstKey.key.numOfPKs]);
if (code) return code;
}
code = tBufferGet(&brinBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.key.ts), &record->lastKey.key.ts);
if (code) return code;
code = tBufferGet(&brinBlock->lastKeyVersions, idx, sizeof(record->lastKey.version), &record->lastKey.version);
if (code) return code;
for (record->lastKey.key.numOfPKs = 0; record->lastKey.key.numOfPKs < brinBlock->numOfPKs;
record->lastKey.key.numOfPKs++) {
code = tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx,
&record->lastKey.key.pks[record->lastKey.key.numOfPKs]);
if (code) return code;
}
code = tBufferGet(&brinBlock->minVers, idx, sizeof(record->minVer), &record->minVer);
if (code) return code;
code = tBufferGet(&brinBlock->maxVers, idx, sizeof(record->maxVer), &record->maxVer);
if (code) return code;
code = tBufferGet(&brinBlock->blockOffsets, idx, sizeof(record->blockOffset), &record->blockOffset);
if (code) return code;
code = tBufferGet(&brinBlock->smaOffsets, idx, sizeof(record->smaOffset), &record->smaOffset);
if (code) return code;
code = tBufferGet(&brinBlock->blockSizes, idx, sizeof(record->blockSize), &record->blockSize);
if (code) return code;
code = tBufferGet(&brinBlock->blockKeySizes, idx, sizeof(record->blockKeySize), &record->blockKeySize);
if (code) return code;
code = tBufferGet(&brinBlock->smaSizes, idx, sizeof(record->smaSize), &record->smaSize);
if (code) return code;
code = tBufferGet(&brinBlock->numRows, idx, sizeof(record->numRow), &record->numRow);
if (code) return code;
code = tBufferGet(&brinBlock->counts, idx, sizeof(record->count), &record->count);
if (code) return code;
return 0;
}
int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) {
int32_t code;
SBuffer *helperBuffer = NULL; // TODO
brinBlk->dp[0].size = 0;
brinBlk->numRec = brinBlock->numOfRecords;
brinBlk->numOfPKs = brinBlock->numOfPKs;
// minTbid
code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid);
if (code) return code;
// maxTbid
code =
tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid), &brinBlk->maxTbid.suid);
if (code) return code;
code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid), &brinBlk->maxTbid.uid);
if (code) return code;
// minVer and maxVer
const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers);
const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers);
brinBlk->minVer = minVers[0];
brinBlk->maxVer = maxVers[0];
for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) {
if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i];
if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i];
}
// compress data
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
SBuffer *bf = &brinBlock->buffers[i];
SCompressInfo info = {
.cmprAlg = brinBlk->cmprAlg,
};
if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) {
info.dataType = TSDB_DATA_TYPE_BIGINT;
} else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) {
info.dataType = TSDB_DATA_TYPE_INT;
} else {
ASSERT(0);
}
code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer);
if (code) return code;
brinBlk->size[i] = info.compressedSize;
brinBlk->dp[0].size += info.compressedSize;
}
// encode primary keys
SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PRIMARY_KEY_COL];
SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PRIMARY_KEY_COL];
for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
SValueColumn *vc = &brinBlock->firstKeyPKs[i];
firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer);
if (code) return code;
}
for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) {
SValueColumn *vc = &brinBlock->lastKeyPKs[i];
lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg;
code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer);
if (code) return code;
}
return 0;
}
int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) {
// if (brinBlk->fmtVersion == 0) {
// return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock);
// } else if (brinBlk->fmtVersion == 1) {
// return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock);
// } else {
// ASSERT(0);
// }
return 0;
}

View File

@ -112,52 +112,47 @@ int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *reco
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record);
// SBrinRecord ----------
typedef union {
struct {
int64_t dataArr1[10];
int32_t dataArr2[5];
};
struct {
int64_t suid;
int64_t uid;
int64_t firstKey;
int64_t firstKeyVer;
int64_t lastKey;
int64_t lastKeyVer;
int64_t minVer;
int64_t maxVer;
int64_t blockOffset;
int64_t smaOffset;
int32_t blockSize;
int32_t blockKeySize;
int32_t smaSize;
int32_t numRow;
int32_t count;
};
typedef struct {
int64_t suid;
int64_t uid;
STsdbRowKey firstKey;
STsdbRowKey lastKey;
int64_t minVer;
int64_t maxVer;
int64_t blockOffset;
int64_t smaOffset;
int32_t blockSize;
int32_t blockKeySize;
int32_t smaSize;
int32_t numRow;
int32_t count;
} SBrinRecord;
typedef union {
struct {
TARRAY2(int64_t) dataArr1[10];
TARRAY2(int32_t) dataArr2[5];
};
struct {
TARRAY2(int64_t) suid[1];
TARRAY2(int64_t) uid[1];
TARRAY2(int64_t) firstKey[1];
TARRAY2(int64_t) firstKeyVer[1];
TARRAY2(int64_t) lastKey[1];
TARRAY2(int64_t) lastKeyVer[1];
TARRAY2(int64_t) minVer[1];
TARRAY2(int64_t) maxVer[1];
TARRAY2(int64_t) blockOffset[1];
TARRAY2(int64_t) smaOffset[1];
TARRAY2(int32_t) blockSize[1];
TARRAY2(int32_t) blockKeySize[1];
TARRAY2(int32_t) smaSize[1];
TARRAY2(int32_t) numRow[1];
TARRAY2(int32_t) count[1];
typedef struct {
int8_t numOfPKs;
int32_t numOfRecords;
union {
SBuffer buffers[15];
struct {
SBuffer suids;
SBuffer uids;
SBuffer firstKeyTimestamps;
SBuffer firstKeyVersions;
SBuffer lastKeyTimestamps;
SBuffer lastKeyVersions;
SBuffer minVers;
SBuffer maxVers;
SBuffer blockOffsets;
SBuffer smaOffsets;
SBuffer blockSizes;
SBuffer blockKeySizes;
SBuffer smaSizes;
SBuffer numRows;
SBuffer counts;
};
};
SValueColumn firstKeyPKs[TD_MAX_PRIMARY_KEY_COL];
SValueColumn lastKeyPKs[TD_MAX_PRIMARY_KEY_COL];
} SBrinBlock;
typedef struct {
@ -169,18 +164,21 @@ typedef struct {
int32_t numRec;
int32_t size[15];
int8_t cmprAlg;
int8_t rsvd[7];
int8_t numOfPKs; // number of primary keys
int8_t rsvd[6];
} SBrinBlk;
typedef TARRAY2(SBrinBlk) TBrinBlkArray;
#define BRIN_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid)
#define BRIN_BLOCK_SIZE(db) ((db)->numOfRecords)
int32_t tBrinBlockInit(SBrinBlock *brinBlock);
int32_t tBrinBlockDestroy(SBrinBlock *brinBlock);
int32_t tBrinBlockClear(SBrinBlock *brinBlock);
int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record);
int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record);
int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer);
int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock);
// other apis
int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);