diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 2c138298b6..50b3540fd2 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -95,13 +95,26 @@ const static uint8_t BIT2_MAP[4] = {0b11111100, 0b11110011, 0b11001111, 0b001111 #define COL_VAL_IS_VALUE(CV) ((CV)->flag == CV_FLAG_VALUE) // SValueColumn ================================ +typedef struct { + int8_t cmprAlg; // filled by caller + int8_t type; // filled by compress + int32_t originalDataSize; // filled by compress + int32_t compressedDataSize; // filled by compress + int32_t originalOffsetSize; // filled by compress + int32_t compressedOffsetSize; // filled by compress +} SValueColumnCompressInfo; + int32_t tValueColumnInit(SValueColumn *valCol); int32_t tValueColumnDestroy(SValueColumn *valCol); int32_t tValueColumnClear(SValueColumn *valCol); int32_t tValueColumnAppend(SValueColumn *valCol, const SValue *value); int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value); -int32_t tValueColumnCompress(SValueColumn *valCol, SBuffer *buffer); -int32_t tValueColumnDecompress(SBuffer *buffer, SValueColumn *valCol); +int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer, + SBuffer *helperBuffer); +int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo, + SValueColumn *valCol, SBuffer *helperBuffer); +int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer); +int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo); // SRow ================================ int32_t tRowBuild(SArray *aColVal, const STSchema *pTSchema, SRow **ppRow); @@ -298,6 +311,18 @@ struct SValueColumn { SBuffer offsets; }; +typedef struct { + int8_t dataType; // fill by caller + int8_t cmprAlg; // fill by caller + int32_t originalSize; // fill by compress + int32_t compressedSize; // fill by compress +} SCompressInfo; + +int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer, + SBuffer *helperBuffer); +int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer, + SBuffer *helperBuffer); + #endif #ifdef __cplusplus diff --git a/include/util/tbuffer.h b/include/util/tbuffer.h index 47db881bad..56645f4403 100644 --- a/include/util/tbuffer.h +++ b/include/util/tbuffer.h @@ -33,9 +33,12 @@ static int32_t tBufferDestroy(SBuffer *buffer); static int32_t tBufferClear(SBuffer *buffer); static int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capacity); static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size); -#define tBufferGetSize(buffer) ((buffer)->size) -#define tBufferGetCapacity(buffer) ((buffer)->capacity) -#define tBufferGetData(buffer) ((buffer)->data) +static int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data); +#define tBufferGetSize(buffer) ((buffer)->size) +#define tBufferGetCapacity(buffer) ((buffer)->capacity) +#define tBufferGetData(buffer) ((buffer)->data) +#define tBufferGetDataAt(buffer, idx) ((char *)(buffer)->data + (idx)) +#define tBufferGetDataEnd(buffer) ((char *)(buffer)->data + (buffer)->size) // SBufferWriter #define BUFFER_WRITER_INITIALIZER(forward, offset, buffer) ((SBufferWriter){forward, offset, buffer}) diff --git a/include/util/tbuffer.inc b/include/util/tbuffer.inc index 8650b44f70..aad4bf4acf 100644 --- a/include/util/tbuffer.inc +++ b/include/util/tbuffer.inc @@ -73,7 +73,7 @@ static FORCE_INLINE int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capa return 0; } -static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) { +static FORCE_INLINE int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) { int32_t code = tBufferEnsureCapacity(buffer, buffer->size + size); if (code) return code; memcpy((char *)buffer->data + buffer->size, data, size); @@ -81,6 +81,14 @@ static int32_t tBufferAppend(SBuffer *buffer, const void *data, uint32_t size) { return 0; } +static FORCE_INLINE int32_t tBufferGet(SBuffer *buffer, int32_t index, uint32_t size, void *data) { + if (index < 0 || (index + 1) * size > buffer->size) { + return TSDB_CODE_OUT_OF_RANGE; + } + memcpy(data, (char *)buffer->data + index * size, size); + return 0; +} + // SBufferWriter static int32_t tBufferWriterInit(SBufferWriter *writer, bool forward, uint32_t offset, SBuffer *buffer) { writer->forward = forward; diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 2192e459f5..13da2238bb 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -3860,14 +3860,214 @@ int32_t tValueColumnGet(SValueColumn *valCol, int32_t idx, SValue *value) { return 0; } -int32_t tValueColumnCompress(SValueColumn *valCol, SBuffer *buffer) { - // TODO - ASSERT(0); +int32_t tValueColumnCompress(SValueColumn *valCol, SValueColumnCompressInfo *compressInfo, SBuffer *buffer, + SBuffer *helperBuffer) { + int32_t code; + SCompressInfo info; + + compressInfo->type = valCol->type; + + if (IS_VAR_DATA_TYPE(valCol->type)) { + info.dataType = TSDB_DATA_TYPE_INT; + info.cmprAlg = compressInfo->cmprAlg; + code = + tCompressData(tBufferGetData(&valCol->offsets), tBufferGetSize(&valCol->offsets), &info, buffer, helperBuffer); + if (code) return code; + + compressInfo->originalOffsetSize = info.originalSize; + compressInfo->compressedOffsetSize = info.compressedSize; + } else { + compressInfo->originalOffsetSize = 0; + compressInfo->compressedOffsetSize = 0; + } + + info.dataType = valCol->type; + info.cmprAlg = compressInfo->cmprAlg; + code = tCompressData(tBufferGetData(&valCol->data), tBufferGetSize(&valCol->data), &info, buffer, helperBuffer); + if (code) return code; + compressInfo->originalDataSize = info.originalSize; + compressInfo->compressedDataSize = info.compressedSize; + return 0; } -int32_t tValueColumnDecompress(SBuffer *buffer, SValueColumn *valCol) { - ASSERT(0); - // TODO +int32_t tValueColumnDecompress(void *input, int32_t inputSize, const SValueColumnCompressInfo *compressInfo, + SValueColumn *valCol, SBuffer *helperBuffer) { + int32_t code; + SCompressInfo info; + + tValueColumnClear(valCol); + + valCol->type = compressInfo->type; + if (IS_VAR_DATA_TYPE(valCol->type)) { + ASSERT(inputSize == compressInfo->compressedOffsetSize + compressInfo->compressedDataSize); + + info.dataType = TSDB_DATA_TYPE_INT; + info.cmprAlg = compressInfo->cmprAlg; + info.originalSize = compressInfo->originalOffsetSize; + info.compressedSize = compressInfo->compressedOffsetSize; + code = tDecompressData(input, compressInfo->compressedOffsetSize, &info, &valCol->offsets, helperBuffer); + if (code) return code; + + valCol->numOfValues = compressInfo->originalOffsetSize / tDataTypes[TSDB_DATA_TYPE_INT].bytes; + } else { + ASSERT(inputSize == compressInfo->compressedDataSize); + valCol->numOfValues = compressInfo->originalDataSize / tDataTypes[valCol->type].bytes; + } + + info.dataType = valCol->type; + info.cmprAlg = compressInfo->cmprAlg; + info.originalSize = compressInfo->originalDataSize; + info.compressedSize = compressInfo->compressedDataSize; + code = tDecompressData((char *)input + compressInfo->compressedOffsetSize, compressInfo->compressedDataSize, &info, + &valCol->data, helperBuffer); + if (code) return code; + + return 0; +} + +int32_t tValueColumnCompressInfoEncode(const SValueColumnCompressInfo *compressInfo, SBufferWriter *writer) { + int32_t code; + uint8_t formatVersion = 0; + + // format version + code = tBufferPutU8(writer, formatVersion); + if (code) return code; + + // struct info + code = tBufferPutI8(writer, compressInfo->cmprAlg); + if (code) return code; + code = tBufferPutI8(writer, compressInfo->type); + if (code) return code; + code = tBufferPutI32v(writer, compressInfo->originalDataSize); + if (code) return code; + code = tBufferPutI32v(writer, compressInfo->compressedDataSize); + if (code) return code; + code = tBufferPutI32v(writer, compressInfo->originalOffsetSize); + if (code) return code; + code = tBufferPutI32v(writer, compressInfo->compressedOffsetSize); + if (code) return code; + + return 0; +} + +int32_t tValueColumnCompressInfoDecode(SBufferReader *reader, SValueColumnCompressInfo *compressInfo) { + int32_t code; + uint8_t formatVersion; + + // format version + code = tBufferGetU8(reader, &formatVersion); + if (code) return code; + + if (formatVersion == 0) { + code = tBufferGetI8(reader, &compressInfo->cmprAlg); + if (code) return code; + code = tBufferGetI8(reader, &compressInfo->type); + if (code) return code; + code = tBufferGetI32v(reader, &compressInfo->originalDataSize); + if (code) return code; + code = tBufferGetI32v(reader, &compressInfo->compressedDataSize); + if (code) return code; + code = tBufferGetI32v(reader, &compressInfo->originalOffsetSize); + if (code) return code; + code = tBufferGetI32v(reader, &compressInfo->compressedOffsetSize); + if (code) return code; + } else { + return TSDB_CODE_INVALID_DATA_FMT; + } + + return 0; +} + +int32_t tCompressData(const void *input, int32_t inputSize, SCompressInfo *cmprInfo, SBuffer *buffer, + SBuffer *helperBuffer) { + cmprInfo->originalSize = inputSize; + + if (cmprInfo->cmprAlg == NO_COMPRESSION) { + cmprInfo->compressedSize = inputSize; + return tBufferAppend(buffer, input, inputSize); + } else { + SBuffer hBuffer; + SBuffer *extraBuffer = helperBuffer; + int32_t extraSizeNeeded = inputSize + COMP_OVERFLOW_BYTES; + + tBufferInit(&hBuffer); + + int32_t code = tBufferEnsureCapacity(buffer, buffer->size + extraSizeNeeded); + if (code) return code; + + if (cmprInfo->cmprAlg == TWO_STAGE_COMP) { + if (extraBuffer == NULL) { + extraBuffer = &hBuffer; + } + + code = tBufferEnsureCapacity(extraBuffer, extraSizeNeeded); + if (code) return code; + } + + cmprInfo->compressedSize = tDataTypes[cmprInfo->dataType].compFunc( // + (void *)input, // input + inputSize, // input size + inputSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements + tBufferGetDataEnd(buffer), // output + extraSizeNeeded, // output size + cmprInfo->cmprAlg, // compression algorithm + tBufferGetData(extraBuffer), // helper buffer + extraSizeNeeded // extra buffer size + ); + if (cmprInfo->compressedSize < 0) { + tBufferDestroy(&hBuffer); + return TSDB_CODE_COMPRESS_ERROR; + } + + buffer->size += cmprInfo->compressedSize; + tBufferDestroy(&hBuffer); + } + + return 0; +} + +int32_t tDecompressData(const void *input, int32_t inputSize, const SCompressInfo *cmprInfo, SBuffer *buffer, + SBuffer *helperBuffer) { + if (cmprInfo->cmprAlg == NO_COMPRESSION) { + ASSERT(inputSize == cmprInfo->originalSize); + return tBufferAppend(buffer, input, inputSize); + } else { + SBuffer hBuffer; + SBuffer *extraBuffer = helperBuffer; + int32_t code; + + tBufferInit(&hBuffer); + + code = tBufferEnsureCapacity(buffer, cmprInfo->originalSize); + if (code) return code; + + if (cmprInfo->cmprAlg == TWO_STAGE_COMP) { + if (extraBuffer == NULL) { + extraBuffer = &hBuffer; + } + code = tBufferEnsureCapacity(extraBuffer, cmprInfo->originalSize + COMP_OVERFLOW_BYTES); + if (code) return code; + } + + int32_t decompressedSize = tDataTypes[cmprInfo->dataType].decompFunc( + (void *)input, // input + inputSize, // inputSize + cmprInfo->originalSize / tDataTypes[cmprInfo->dataType].bytes, // number of elements + tBufferGetDataEnd(buffer), // output + cmprInfo->originalSize, // output size + cmprInfo->cmprAlg, // compression algorithm + extraBuffer, // helper buffer + cmprInfo->originalSize + COMP_OVERFLOW_BYTES // extra buffer size + ); + if (decompressedSize < 0) { + tBufferDestroy(&hBuffer); + return TSDB_CODE_COMPRESS_ERROR; + } + ASSERT(decompressedSize == cmprInfo->originalSize); + buffer->size += decompressedSize; + tBufferDestroy(&hBuffer); + } + return 0; } diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index e61c767edc..bca157da3f 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -94,7 +94,7 @@ typedef struct STsdbRowKey STsdbRowKey; #define PAGE_CONTENT_SIZE(PAGE) ((PAGE) - sizeof(TSCKSUM)) #define LOGIC_TO_FILE_OFFSET(LOFFSET, PAGE) \ ((LOFFSET) / PAGE_CONTENT_SIZE(PAGE) * (PAGE) + (LOFFSET) % PAGE_CONTENT_SIZE(PAGE)) -#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE)*PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE)) +#define FILE_TO_LOGIC_OFFSET(OFFSET, PAGE) ((OFFSET) / (PAGE) * PAGE_CONTENT_SIZE(PAGE) + (OFFSET) % (PAGE)) #define PAGE_OFFSET(PGNO, PAGE) (((PGNO)-1) * (PAGE)) #define OFFSET_PGNO(OFFSET, PAGE) ((OFFSET) / (PAGE) + 1) @@ -283,7 +283,7 @@ int32_t tsdbReadDelIdx(SDelFReader *pReader, SArray *aDelIdx); // tsdbRead.c ============================================================================================== int32_t tsdbTakeReadSnap2(STsdbReader *pReader, _query_reseek_func_t reseek, STsdbReadSnap **ppSnap); void tsdbUntakeReadSnap2(STsdbReader *pReader, STsdbReadSnap *pSnap, bool proactive); -int32_t tsdbGetTableSchema(SMeta* pMeta, int64_t uid, STSchema** pSchema, int64_t* suid); +int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_t *suid); // tsdbMerge.c ============================================================================================== typedef struct { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index db9381277a..aa3f5608d4 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2550,7 +2550,7 @@ static int32_t getNextRowFromFS(void *iter, TSDBROW **ppRow, bool *pIgnoreEarlie goto _err; } - state->iBrinRecord = BRIN_BLOCK_SIZE(&state->brinBlock) - 1; + state->iBrinRecord = state->brinBlock.numOfRecords - 1; state->state = SFSNEXTROW_BRINBLOCK; } diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 006c7aebc7..b49611bf02 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -195,6 +195,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB tsdbReadFile(reader->fd[TSDB_FTYPE_HEAD], brinBlk->dp->offset, reader->config->bufArr[0], brinBlk->dp->size, 0); TSDB_CHECK_CODE(code, lino, _exit); +#if 0 int32_t size = 0; tBrinBlockClear(brinBlock); for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { @@ -218,6 +219,7 @@ int32_t tsdbDataFileReadBrinBlock(SDataFileReader *reader, const SBrinBlk *brinB size += brinBlk->size[j]; } +#endif _exit: if (code) { @@ -801,82 +803,109 @@ int32_t tsdbWriterUpdVerRange(SVersionRange *range, int64_t minVer, int64_t maxV int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAlg, int64_t *fileSize, TBrinBlkArray *brinBlkArray, uint8_t **bufArr, SVersionRange *range) { - if (BRIN_BLOCK_SIZE(brinBlock) == 0) return 0; + if (brinBlock->numOfRecords == 0) return 0; int32_t code; - // get SBrinBlk - SBrinBlk brinBlk[1] = { - { - .dp[0] = - { - .offset = *fileSize, - .size = 0, - }, - .minTbid = - { - .suid = TARRAY2_FIRST(brinBlock->suid), - .uid = TARRAY2_FIRST(brinBlock->uid), - }, - .maxTbid = - { - .suid = TARRAY2_LAST(brinBlock->suid), - .uid = TARRAY2_LAST(brinBlock->uid), - }, - .minVer = TARRAY2_FIRST(brinBlock->minVer), - .maxVer = TARRAY2_FIRST(brinBlock->minVer), - .numRec = BRIN_BLOCK_SIZE(brinBlock), - .cmprAlg = cmprAlg, - }, + SBrinBlk brinBlk = { + .numRec = brinBlock->numOfRecords, + .numOfPKs = brinBlock->numOfPKs, + .cmprAlg = cmprAlg, }; - for (int32_t i = 1; i < BRIN_BLOCK_SIZE(brinBlock); i++) { - if (brinBlk->minVer > TARRAY2_GET(brinBlock->minVer, i)) { - brinBlk->minVer = TARRAY2_GET(brinBlock->minVer, i); - } - if (brinBlk->maxVer < TARRAY2_GET(brinBlock->maxVer, i)) { - brinBlk->maxVer = TARRAY2_GET(brinBlock->maxVer, i); - } + brinBlk.dp->offset = *fileSize; + brinBlk.dp->size = 0; + + // minTbid + code = tBufferGet(&brinBlock->suids, 0, sizeof(int64_t), &brinBlk.minTbid.suid); + if (code) return code; + code = tBufferGet(&brinBlock->uids, 0, sizeof(int64_t), &brinBlk.minTbid.uid); + if (code) return code; + // maxTbid + code = tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.suid); + if (code) return code; + code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.uid); + if (code) return code; + // minVer and maxVer + const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers); + const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers); + brinBlk.minVer = minVers[0]; + brinBlk.maxVer = maxVers[0]; + for (int32_t i = 1; i < brinBlock->numOfRecords; i++) { + brinBlk.minVer = TMIN(brinBlk.minVer, minVers[i]); + brinBlk.maxVer = TMAX(brinBlk.maxVer, maxVers[i]); } - tsdbWriterUpdVerRange(range, brinBlk->minVer, brinBlk->maxVer); + tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer); // write to file - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); i++) { - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr1 + i), TARRAY2_DATA_LEN(brinBlock->dataArr1 + i), - TSDB_DATA_TYPE_BIGINT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[i], &bufArr[1]); + for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { + SBuffer *bf = &brinBlock->buffers[i]; + SCompressInfo info = { + .cmprAlg = cmprAlg, + }; + + if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { + info.dataType = TSDB_DATA_TYPE_BIGINT; + } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) { + info.dataType = TSDB_DATA_TYPE_INT; + } else { + ASSERT(0); + } + + tBufferClear(NULL /* TODO */); + code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, NULL /* TODO */, NULL /* TODO*/); if (code) return code; - code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[i]); + code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, info.compressedSize); if (code) return code; - brinBlk->dp->size += brinBlk->size[i]; - *fileSize += brinBlk->size[i]; + brinBlk.size[i] = info.compressedSize; + brinBlk.dp->size += info.compressedSize; + *fileSize += info.compressedSize; } - for (int32_t i = 0, j = ARRAY_SIZE(brinBlock->dataArr1); i < ARRAY_SIZE(brinBlock->dataArr2); i++, j++) { - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(brinBlock->dataArr2 + i), TARRAY2_DATA_LEN(brinBlock->dataArr2 + i), - TSDB_DATA_TYPE_INT, brinBlk->cmprAlg, &bufArr[0], 0, &brinBlk->size[j], &bufArr[1]); - if (code) return code; + // write primary keys to file + if (brinBlock->numOfPKs > 0) { + SBufferWriter writer; + SValueColumnCompressInfo vcinfo = {.cmprAlg = cmprAlg}; - code = tsdbWriteFile(fd, *fileSize, bufArr[0], brinBlk->size[j]); - if (code) return code; + tBufferClear(NULL); + tBufferWriterInit(&writer, true, 0, NULL /* TODO */); - brinBlk->dp->size += brinBlk->size[j]; - *fileSize += brinBlk->size[j]; + for (int32_t i = 0; i < brinBlk.numOfPKs; i++) { + code = tValueColumnCompress(&brinBlock->firstKeyPKs[i], &vcinfo, NULL /* TODO */, NULL /* TODO */); + if (code) return code; + + code = tValueColumnCompressInfoEncode(&vcinfo, &writer); + if (code) return code; + } + + for (int32_t i = 0; i < brinBlk.numOfPKs; i++) { + code = tValueColumnCompress(&brinBlock->lastKeyPKs[i], &vcinfo, NULL /* TODO */, NULL /* TODO */); + if (code) return code; + + code = tValueColumnCompressInfoEncode(&vcinfo, &writer); + if (code) return code; + } + + // write to file + // TODO + ASSERT(0); + // code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, tBufferGetSize(NULL)); + // if (code) return code; + // *fileSize += tBufferGetSize(NULL); + // brinBlk->dp->size += tBufferGetSize(NULL); + + // code = tsdbWriteFile(fd, *fileSize, NULL /* TODO */, tBufferGetSize(NULL)); + // if (code) return code; + // *fileSize += tBufferGetSize(NULL); + // brinBlk->dp->size += tBufferGetSize(NULL); + tBufferWriterDestroy(writer); } -#if 0 - SBrinRecord record; - for (int32_t i = 0; i < BRIN_BLOCK_SIZE(brinBlock); i++) { - tBrinBlockGet(brinBlock, i, &record); - tsdbInfo("write brin block, block num:%04d, idx:%04d suid:%ld, uid:%ld, offset:%ld, numRow:%d, count:%d", - TARRAY2_SIZE(brinBlkArray), i, record.suid, record.uid, record.blockOffset, record.numRow, record.count); - } -#endif - // append to brinBlkArray - code = TARRAY2_APPEND_PTR(brinBlkArray, brinBlk); + code = TARRAY2_APPEND_PTR(brinBlkArray, &brinBlk); if (code) return code; tBrinBlockClear(brinBlock); @@ -885,7 +914,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl } static int32_t tsdbDataFileWriteBrinBlock(SDataFileWriter *writer) { - if (BRIN_BLOCK_SIZE(writer->brinBlock) == 0) return 0; + if (writer->brinBlock->numOfRecords == 0) return 0; int32_t code = 0; int32_t lino = 0; @@ -909,7 +938,7 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR code = tBrinBlockPut(writer->brinBlock, record); TSDB_CHECK_CODE(code, lino, _exit); - if (BRIN_BLOCK_SIZE(writer->brinBlock) >= writer->config->maxRow) { + if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) { code = tsdbDataFileWriteBrinBlock(writer); TSDB_CHECK_CODE(code, lino, _exit); } @@ -932,10 +961,6 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData SBrinRecord record[1] = {{ .suid = bData->suid, .uid = bData->uid, - .firstKey = bData->aTSKEY[0], - .firstKeyVer = bData->aVersion[0], - .lastKey = bData->aTSKEY[bData->nRow - 1], - .lastKeyVer = bData->aVersion[bData->nRow - 1], .minVer = bData->aVersion[0], .maxVer = bData->aVersion[0], .blockOffset = writer->files[TSDB_FTYPE_DATA].size, @@ -947,6 +972,9 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData .count = 1, }}; + tsdbRowGetKey(&tsdbRowFromBlockData(bData, 0), &record->firstKey); + tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey); + for (int32_t i = 1; i < bData->nRow; ++i) { if (bData->aTSKEY[i] != bData->aTSKEY[i - 1]) { record->count++; @@ -1074,46 +1102,57 @@ _exit: return code; } -static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const TSDBKEY *key) { +static int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) { + if (key1 == NULL) { + return 1; + } else if (key2 == NULL) { + return -1; + } else { + return tsdbRowKeyCmpr(key1, key2); + } +} + +static int32_t tsdbDataFileDoWriteTableOldData(SDataFileWriter *writer, const STsdbRowKey *key) { if (writer->ctx->tbHasOldData == false) return 0; - int32_t code = 0; - int32_t lino = 0; + int32_t code = 0; + int32_t lino = 0; + STsdbRowKey rowKey; for (;;) { for (;;) { // SBlockData for (; writer->ctx->blockDataIdx < writer->ctx->blockData->nRow; writer->ctx->blockDataIdx++) { - if (key->ts < writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] // - || (key->ts == writer->ctx->blockData->aTSKEY[writer->ctx->blockDataIdx] && - key->version < writer->ctx->blockData->aVersion[writer->ctx->blockDataIdx])) { - goto _exit; - } else { - TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); + TSDBROW row = tsdbRowFromBlockData(writer->ctx->blockData, writer->ctx->blockDataIdx); + + tsdbRowGetKey(&row, &rowKey); + if (tsdbRowKeyCmprNullAsLargest(&rowKey, key) < 0) { // key <= rowKey code = tsdbDataFileDoWriteTSRow(writer, &row); TSDB_CHECK_CODE(code, lino, _exit); + } else { + goto _exit; } } // SBrinBlock - if (writer->ctx->brinBlockIdx >= BRIN_BLOCK_SIZE(writer->ctx->brinBlock)) { + if (writer->ctx->brinBlockIdx >= writer->ctx->brinBlock->numOfRecords) { break; } - for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { - if (TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx) != writer->ctx->tbid->uid) { + for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) { + SBrinRecord record; + tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record); + if (record.uid != writer->ctx->tbid->uid) { writer->ctx->tbHasOldData = false; goto _exit; } - if (key->ts < TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) // - || (key->ts == TARRAY2_GET(writer->ctx->brinBlock->firstKey, writer->ctx->brinBlockIdx) && - key->version < TARRAY2_GET(writer->ctx->brinBlock->firstKeyVer, writer->ctx->brinBlockIdx))) { + if (tsdbRowKeyCmpr(key, &record.firstKey) < 0) { // key < record->firstKey goto _exit; } else { SBrinRecord record[1]; tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); - if (key->ts > record->lastKey || (key->ts == record->lastKey && key->version > record->maxVer)) { + if (tsdbRowKeyCmprNullAsLargest(key, &record->lastKey) > 0) { // key > record->lastKey if (writer->blockData->nRow > 0) { code = tsdbDataFileDoWriteBlockData(writer, writer->blockData); TSDB_CHECK_CODE(code, lino, _exit); @@ -1166,16 +1205,10 @@ static int32_t tsdbDataFileDoWriteTSData(SDataFileWriter *writer, TSDBROW *row) int32_t lino = 0; if (writer->ctx->tbHasOldData) { - TSDBKEY key[1]; - if (row->type == TSDBROW_ROW_FMT) { - key->ts = row->pTSRow->ts; - key->version = row->version; - } else { - key->ts = row->pBlockData->aTSKEY[row->iRow]; - key->version = row->pBlockData->aVersion[row->iRow]; - } + STsdbRowKey key; + tsdbRowGetKey(row, &key); - code = tsdbDataFileDoWriteTableOldData(writer, key); + code = tsdbDataFileDoWriteTableOldData(writer, &key); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1196,12 +1229,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { int32_t lino = 0; if (writer->ctx->tbHasOldData) { - TSDBKEY key = { - .ts = TSKEY_MAX, - .version = VERSION_MAX, - }; - - code = tsdbDataFileDoWriteTableOldData(writer, &key); + code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as largest key */); TSDB_CHECK_CODE(code, lino, _exit); ASSERT(writer->ctx->tbHasOldData == false); @@ -1229,35 +1257,32 @@ static int32_t tsdbDataFileWriteTableDataBegin(SDataFileWriter *writer, const TA TABLEID tbid1[1]; writer->ctx->tbHasOldData = false; while (writer->ctx->brinBlkArray) { // skip data of previous table - for (; writer->ctx->brinBlockIdx < BRIN_BLOCK_SIZE(writer->ctx->brinBlock); writer->ctx->brinBlockIdx++) { - TABLEID tbid2[1] = {{ - .suid = TARRAY2_GET(writer->ctx->brinBlock->suid, writer->ctx->brinBlockIdx), - .uid = TARRAY2_GET(writer->ctx->brinBlock->uid, writer->ctx->brinBlockIdx), - }}; + for (; writer->ctx->brinBlockIdx < writer->ctx->brinBlock->numOfRecords; writer->ctx->brinBlockIdx++) { + SBrinRecord record; + tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, &record); - if (tbid2->uid == tbid->uid) { + if (record.uid == tbid->uid) { writer->ctx->tbHasOldData = true; goto _begin; - } else if (tbid2->suid > tbid->suid || (tbid2->suid == tbid->suid && tbid2->uid > tbid->uid)) { + } else if (record.suid > tbid->suid || (record.suid == tbid->suid && record.uid > tbid->uid)) { goto _begin; } else { - if (tbid2->uid != writer->ctx->tbid->uid) { - if (drop && tbid1->uid == tbid2->uid) { + if (record.uid != writer->ctx->tbid->uid) { + if (drop && tbid1->uid == record.uid) { continue; - } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, tbid2->uid, &info, NULL) != 0) { + } else if (metaGetInfo(writer->config->tsdb->pVnode->pMeta, record.uid, &info, NULL) != 0) { drop = true; - *tbid1 = *tbid2; + tbid1->suid = record.suid; + tbid1->uid = record.uid; continue; } else { drop = false; - writer->ctx->tbid[0] = *tbid2; + writer->ctx->tbid->suid = record.suid; + writer->ctx->tbid->uid = record.uid; } } - SBrinRecord record[1]; - tBrinBlockGet(writer->ctx->brinBlock, writer->ctx->brinBlockIdx, record); - - code = tsdbDataFileWriteBrinRecord(writer, record); + code = tsdbDataFileWriteBrinRecord(writer, &record); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -1836,12 +1861,7 @@ int32_t tsdbDataFileWriteBlockData(SDataFileWriter *writer, SBlockData *bData) { } if (writer->ctx->tbHasOldData) { - TSDBKEY key = { - .ts = bData->aTSKEY[0], - .version = bData->aVersion[0], - }; - - code = tsdbDataFileDoWriteTableOldData(writer, &key); + code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as largest key */); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.c b/source/dnode/vnode/src/tsdb/tsdbIter.c index a786ba8ffe..cbef113e4b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbIter.c @@ -147,12 +147,11 @@ static int32_t tsdbDataIterNext(STsdbIter *iter, const TABLEID *tbid) { } // SBrinBlock - if (iter->dataData->brinBlockIdx >= BRIN_BLOCK_SIZE(iter->dataData->brinBlock)) { + if (iter->dataData->brinBlockIdx >= iter->dataData->brinBlock->numOfRecords) { break; } - for (; iter->dataData->brinBlockIdx < BRIN_BLOCK_SIZE(iter->dataData->brinBlock); - iter->dataData->brinBlockIdx++) { + for (; iter->dataData->brinBlockIdx < iter->dataData->brinBlock->numOfRecords; iter->dataData->brinBlockIdx++) { SBrinRecord record[1]; tBrinBlockGet(iter->dataData->brinBlock, iter->dataData->brinBlockIdx, record); diff --git a/source/dnode/vnode/src/tsdb/tsdbRead2.c b/source/dnode/vnode/src/tsdb/tsdbRead2.c index 4144579cd9..8e1a6bd319 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead2.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead2.c @@ -591,7 +591,7 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN } // 1. time range check - if (pRecord->firstKey > w.ekey || pRecord->lastKey < w.skey) { + if (pRecord->firstKey.key.ts > w.ekey || pRecord->lastKey.key.ts < w.skey) { continue; } @@ -609,11 +609,11 @@ static int32_t doLoadFileBlock(STsdbReader* pReader, SArray* pIndexList, SBlockN return TSDB_CODE_OUT_OF_MEMORY; } - if (pScanInfo->filesetWindow.skey > pRecord->firstKey) { - pScanInfo->filesetWindow.skey = pRecord->firstKey; + if (pScanInfo->filesetWindow.skey > pRecord->firstKey.key.ts) { + pScanInfo->filesetWindow.skey = pRecord->firstKey.key.ts; } - if (pScanInfo->filesetWindow.ekey < pRecord->lastKey) { - pScanInfo->filesetWindow.ekey = pRecord->lastKey; + if (pScanInfo->filesetWindow.ekey < pRecord->lastKey.key.ts) { + pScanInfo->filesetWindow.ekey = pRecord->lastKey.key.ts; } pBlockNum->numOfBlocks += 1; @@ -744,9 +744,9 @@ static int32_t getEndPosInDataBlock(STsdbReader* pReader, SBlockData* pBlockData int32_t endPos = -1; bool asc = ASCENDING_TRAVERSE(pReader->info.order); - if (asc && pReader->info.window.ekey >= pRecord->lastKey) { + if (asc && pReader->info.window.ekey >= pRecord->lastKey.key.ts) { endPos = pRecord->numRow - 1; - } else if (!asc && pReader->info.window.skey <= pRecord->firstKey) { + } else if (!asc && pReader->info.window.skey <= pRecord->firstKey.key.ts) { endPos = 0; } else { int64_t key = asc ? pReader->info.window.ekey : pReader->info.window.skey; @@ -889,8 +889,12 @@ static void copyNumericCols(const SColData* pData, SFileBlockDumpInfo* pDumpInfo static void blockInfoToRecord(SBrinRecord* record, SFileDataBlockInfo* pBlockInfo) { record->uid = pBlockInfo->uid; - record->firstKey = pBlockInfo->firstKey; - record->lastKey = pBlockInfo->lastKey; + record->firstKey = (STsdbRowKey){ + .key = {.ts = pBlockInfo->firstKey, .numOfPKs = 0}, + }; + record->lastKey = (STsdbRowKey){ + .key = {.ts = pBlockInfo->lastKey, .numOfPKs = 0}, + }; record->minVer = pBlockInfo->minVer; record->maxVer = pBlockInfo->maxVer; record->blockOffset = pBlockInfo->blockOffset; @@ -933,9 +937,10 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { // row index of dump info remain the initial position, let's find the appropriate start position. if ((pDumpInfo->rowIndex == 0 && asc) || (pDumpInfo->rowIndex == pRecord->numRow - 1 && (!asc))) { - if (asc && pReader->info.window.skey <= pRecord->firstKey && pReader->info.verRange.minVer <= pRecord->minVer) { + if (asc && pReader->info.window.skey <= pRecord->firstKey.key.ts && + pReader->info.verRange.minVer <= pRecord->minVer) { // pDumpInfo->rowIndex = 0; - } else if (!asc && pReader->info.window.ekey >= pRecord->lastKey && + } else if (!asc && pReader->info.window.ekey >= pRecord->lastKey.key.ts && pReader->info.verRange.maxVer >= pRecord->maxVer) { // pDumpInfo->rowIndex = pRecord->numRow - 1; } else { // find the appropriate the start position in current block, and set it to be the current rowIndex @@ -948,8 +953,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { tsdbError( "%p failed to locate the start position in current block, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", minVer:%" PRId64 ", maxVer:%" PRId64 " %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->minVer, - pRecord->maxVer, pReader->idStr); + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey.key.ts, pRecord->lastKey.key.ts, + pRecord->minVer, pRecord->maxVer, pReader->idStr); return TSDB_CODE_INVALID_PARA; } @@ -1058,7 +1063,7 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } } else { - int64_t ts = asc ? pRecord->lastKey : pRecord->firstKey; + int64_t ts = asc ? pRecord->lastKey.key.ts : pRecord->firstKey.key.ts; setBlockAllDumped(pDumpInfo, ts, pReader->info.order); } @@ -1068,8 +1073,8 @@ static int32_t copyBlockDataToSDataBlock(STsdbReader* pReader) { int32_t unDumpedRows = asc ? pRecord->numRow - pDumpInfo->rowIndex : pDumpInfo->rowIndex + 1; tsdbDebug("%p copy file block to sdatablock, global index:%d, table index:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, remain:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", uid:%" PRIu64 " elapsed time:%.2f ms, %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, dumpedRows, - unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr); + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey.key.ts, pRecord->lastKey.key.ts, + dumpedRows, unDumpedRows, pRecord->minVer, pRecord->maxVer, pBlockInfo->uid, elapsedTime, pReader->idStr); return TSDB_CODE_SUCCESS; } @@ -1131,8 +1136,8 @@ static int32_t doLoadFileBlockData(STsdbReader* pReader, SDataBlockIter* pBlockI tsdbDebug("%p load file block into buffer, global index:%d, index in table block list:%d, brange:%" PRId64 "-%" PRId64 ", rows:%d, minVer:%" PRId64 ", maxVer:%" PRId64 ", elapsed time:%.2f ms, %s", - pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey, pRecord->lastKey, pRecord->numRow, - pRecord->minVer, pRecord->maxVer, elapsedTime, pReader->idStr); + pReader, pBlockIter->index, pBlockInfo->tbBlockIdx, pRecord->firstKey.key.ts, pRecord->lastKey.key.ts, + pRecord->numRow, pRecord->minVer, pRecord->maxVer, elapsedTime, pReader->idStr); pReader->cost.blockLoadTime += elapsedTime; pDumpInfo->allDumped = false; @@ -1235,9 +1240,9 @@ static int32_t setFileBlockActiveInBlockIter(STsdbReader* pReader, SDataBlockIte static bool overlapWithNeighborBlock2(SFileDataBlockInfo* pBlock, SBrinRecord* pRec, int32_t order) { // it is the last block in current file, no chance to overlap with neighbor blocks. if (ASCENDING_TRAVERSE(order)) { - return pBlock->lastKey == pRec->firstKey; + return pBlock->lastKey == pRec->firstKey.key.ts; } else { - return pBlock->firstKey == pRec->lastKey; + return pBlock->firstKey == pRec->lastKey.key.ts; } } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 33604d21de..dc16953d24 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -281,7 +281,7 @@ void initBrinRecordIter(SBrinRecordIter* pIter, SDataFileReader* pReader, SArray } SBrinRecord* getNextBrinRecord(SBrinRecordIter* pIter) { - if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= TARRAY2_SIZE(pIter->block.numRow)) { + if (pIter->blockIndex == -1 || (pIter->recordIndex + 1) >= pIter->block.numOfRecords) { pIter->blockIndex += 1; if (pIter->blockIndex >= taosArrayGetSize(pIter->pBrinBlockList)) { return NULL; @@ -358,8 +358,8 @@ static int32_t fileDataBlockOrderCompar(const void* pLeft, const void* pRight, v static void recordToBlockInfo(SFileDataBlockInfo* pBlockInfo, SBrinRecord* record){ pBlockInfo->uid = record->uid; - pBlockInfo->firstKey = record->firstKey; - pBlockInfo->lastKey = record->lastKey; + pBlockInfo->firstKey = record->firstKey.key.ts; + pBlockInfo->lastKey = record->lastKey.key.ts; pBlockInfo->minVer = record->minVer; pBlockInfo->maxVer = record->maxVer; pBlockInfo->blockOffset = record->blockOffset; @@ -962,15 +962,15 @@ static bool doCheckDatablockOverlap(STableBlockScanInfo* pBlockScanInfo, const S for (int32_t i = startIndex; i < num; i += 1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); - if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) { + if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) { if (p->version >= pRecord->minVer) { return true; } - } else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts + } else if (p->ts < pRecord->firstKey.key.ts) { // p->ts < pBlock->minKey.ts if (p->version >= pRecord->minVer) { if (i < num - 1) { TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); - if (pnext->ts >= pRecord->firstKey) { + if (pnext->ts >= pRecord->firstKey.key.ts) { return true; } } else { // it must be the last point @@ -991,12 +991,12 @@ static bool doCheckDatablockOverlapWithoutVersion(STableBlockScanInfo* pBlockSca for (int32_t i = startIndex; i < num; i += 1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, i); - if (p->ts >= pRecord->firstKey && p->ts <= pRecord->lastKey) { + if (p->ts >= pRecord->firstKey.key.ts && p->ts <= pRecord->lastKey.key.ts) { return true; - } else if (p->ts < pRecord->firstKey) { // p->ts < pBlock->minKey.ts + } else if (p->ts < pRecord->firstKey.key.ts) { // p->ts < pBlock->minKey.ts if (i < num - 1) { TSDBKEY* pnext = taosArrayGet(pBlockScanInfo->delSkyline, i + 1); - if (pnext->ts >= pRecord->firstKey) { + if (pnext->ts >= pRecord->firstKey.key.ts) { return true; } } @@ -1016,7 +1016,7 @@ bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecor // ts is not overlap TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); - if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) { + if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) { return false; } @@ -1027,10 +1027,10 @@ bool overlapWithDelSkyline(STableBlockScanInfo* pBlockScanInfo, const SBrinRecor int32_t index = pBlockScanInfo->fileDelIndex; while (1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); - if (p->ts > pRecord->firstKey && index > 0) { + if (p->ts > pRecord->firstKey.key.ts && index > 0) { index -= 1; } else { // find the first point that is smaller than the minKey.ts of dataBlock. - if (p->ts == pRecord->firstKey && p->version < pRecord->maxVer && index > 0) { + if (p->ts == pRecord->firstKey.key.ts && p->version < pRecord->maxVer && index > 0) { index -= 1; } break; @@ -1049,7 +1049,7 @@ bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const // ts is not overlap TSDBKEY* pFirst = taosArrayGet(pBlockScanInfo->delSkyline, 0); TSDBKEY* pLast = taosArrayGetLast(pBlockScanInfo->delSkyline); - if (pRecord->firstKey > pLast->ts || pRecord->lastKey < pFirst->ts) { + if (pRecord->firstKey.key.ts > pLast->ts || pRecord->lastKey.key.ts < pFirst->ts) { return false; } @@ -1060,10 +1060,10 @@ bool overlapWithDelSkylineWithoutVer(STableBlockScanInfo* pBlockScanInfo, const int32_t index = pBlockScanInfo->fileDelIndex; while (1) { TSDBKEY* p = taosArrayGet(pBlockScanInfo->delSkyline, index); - if (p->ts > pRecord->firstKey && index > 0) { + if (p->ts > pRecord->firstKey.key.ts && index > 0) { index -= 1; } else { // find the first point that is smaller than the minKey.ts of dataBlock. - if (p->ts == pRecord->firstKey && index > 0) { + if (p->ts == pRecord->firstKey.key.ts && index > 0) { index -= 1; } break; diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 876c0df4a0..4bf1d9586c 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -97,10 +97,24 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * SBrinRecord record = { .suid = pBlockIdx->suid, .uid = pBlockIdx->uid, - .firstKey = dataBlk->minKey.ts, - .firstKeyVer = dataBlk->minKey.version, - .lastKey = dataBlk->maxKey.ts, - .lastKeyVer = dataBlk->maxKey.version, + .firstKey = + (STsdbRowKey){ + .key = + (SRowKey){ + .ts = dataBlk->minKey.ts, + .numOfPKs = 0, + }, + .version = dataBlk->minKey.version, + }, + .lastKey = + (STsdbRowKey){ + .key = + (SRowKey){ + .ts = dataBlk->maxKey.ts, + .numOfPKs = 0, + }, + .version = dataBlk->maxKey.version, + }, .minVer = dataBlk->minVer, .maxVer = dataBlk->maxVer, .blockOffset = dataBlk->aSubBlock->offset, @@ -119,7 +133,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * code = tBrinBlockPut(ctx->brinBlock, &record); TSDB_CHECK_CODE(code, lino, _exit); - if (BRIN_BLOCK_SIZE(ctx->brinBlock) >= ctx->maxRow) { + if (ctx->brinBlock->numOfRecords >= ctx->maxRow) { SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->brinBlkArray, ctx->bufArr, &range); @@ -128,7 +142,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader * } } - if (BRIN_BLOCK_SIZE(ctx->brinBlock) > 0) { + if (ctx->brinBlock->numOfRecords > 0) { SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->brinBlkArray, ctx->bufArr, &range); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index e938caa118..38d4e18669 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -105,56 +105,230 @@ int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecor // SBrinRecord ---------- int32_t tBrinBlockInit(SBrinBlock *brinBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) { - TARRAY2_INIT(&brinBlock->dataArr1[i]); + brinBlock->numOfPKs = 0; + brinBlock->numOfRecords = 0; + for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { + tBufferInit(&brinBlock->buffers[i]); } - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) { - TARRAY2_INIT(&brinBlock->dataArr2[i]); + for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) { + tValueColumnInit(&brinBlock->firstKeyPKs[i]); + tValueColumnInit(&brinBlock->lastKeyPKs[i]); } return 0; } int32_t tBrinBlockDestroy(SBrinBlock *brinBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) { - TARRAY2_DESTROY(&brinBlock->dataArr1[i], NULL); + brinBlock->numOfPKs = 0; + brinBlock->numOfRecords = 0; + for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { + tBufferDestroy(&brinBlock->buffers[i]); } - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) { - TARRAY2_DESTROY(&brinBlock->dataArr2[i], NULL); + for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) { + tValueColumnDestroy(&brinBlock->firstKeyPKs[i]); + tValueColumnDestroy(&brinBlock->lastKeyPKs[i]); } return 0; } int32_t tBrinBlockClear(SBrinBlock *brinBlock) { - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) { - TARRAY2_CLEAR(&brinBlock->dataArr1[i], NULL); + brinBlock->numOfPKs = 0; + brinBlock->numOfRecords = 0; + for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { + tBufferClear(&brinBlock->buffers[i]); } - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) { - TARRAY2_CLEAR(&brinBlock->dataArr2[i], NULL); + for (int32_t i = 0; i < TD_MAX_PRIMARY_KEY_COL; ++i) { + tValueColumnClear(&brinBlock->firstKeyPKs[i]); + tValueColumnClear(&brinBlock->lastKeyPKs[i]); } return 0; } int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { int32_t code; - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) { - code = TARRAY2_APPEND(&brinBlock->dataArr1[i], record->dataArr1[i]); + + ASSERT(record->firstKey.key.numOfPKs == record->lastKey.key.numOfPKs); + + if (brinBlock->numOfRecords == 0) { + brinBlock->numOfPKs = record->firstKey.key.numOfPKs; + } + + ASSERT(brinBlock->numOfPKs == record->firstKey.key.numOfPKs); + code = tBufferAppend(&brinBlock->suids, &record->suid, sizeof(record->suid)); + if (code) return code; + code = tBufferAppend(&brinBlock->uids, &record->uid, sizeof(record->uid)); + if (code) return code; + code = tBufferAppend(&brinBlock->firstKeyTimestamps, &record->firstKey.key.ts, sizeof(record->firstKey.key.ts)); + if (code) return code; + code = tBufferAppend(&brinBlock->firstKeyVersions, &record->firstKey.version, sizeof(record->firstKey.version)); + if (code) return code; + for (int32_t i = 0; i < record->firstKey.key.numOfPKs; ++i) { + code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]); if (code) return code; } - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) { - code = TARRAY2_APPEND(&brinBlock->dataArr2[i], record->dataArr2[i]); + code = tBufferAppend(&brinBlock->lastKeyTimestamps, &record->lastKey.key.ts, sizeof(record->lastKey.key.ts)); + if (code) return code; + code = tBufferAppend(&brinBlock->lastKeyVersions, &record->lastKey.version, sizeof(record->lastKey.version)); + if (code) return code; + for (int32_t i = 0; i < record->lastKey.key.numOfPKs; ++i) { + code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]); if (code) return code; } + code = tBufferAppend(&brinBlock->minVers, &record->minVer, sizeof(record->minVer)); + if (code) return code; + code = tBufferAppend(&brinBlock->maxVers, &record->maxVer, sizeof(record->maxVer)); + if (code) return code; + code = tBufferAppend(&brinBlock->blockOffsets, &record->blockOffset, sizeof(record->blockOffset)); + if (code) return code; + code = tBufferAppend(&brinBlock->smaOffsets, &record->smaOffset, sizeof(record->smaOffset)); + if (code) return code; + code = tBufferAppend(&brinBlock->blockSizes, &record->blockSize, sizeof(record->blockSize)); + if (code) return code; + code = tBufferAppend(&brinBlock->blockKeySizes, &record->blockKeySize, sizeof(record->blockKeySize)); + if (code) return code; + code = tBufferAppend(&brinBlock->smaSizes, &record->smaSize, sizeof(record->smaSize)); + if (code) return code; + code = tBufferAppend(&brinBlock->numRows, &record->numRow, sizeof(record->numRow)); + if (code) return code; + code = tBufferAppend(&brinBlock->counts, &record->count, sizeof(record->count)); + if (code) return code; + + brinBlock->numOfRecords++; + return 0; } int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) { - if (idx >= BRIN_BLOCK_SIZE(brinBlock)) return TSDB_CODE_OUT_OF_RANGE; - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr1); ++i) { - record->dataArr1[i] = TARRAY2_GET(&brinBlock->dataArr1[i], idx); + int32_t code; + + if (idx < 0 || idx >= brinBlock->numOfRecords) { + return TSDB_CODE_OUT_OF_RANGE; } - for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->dataArr2); ++i) { - record->dataArr2[i] = TARRAY2_GET(&brinBlock->dataArr2[i], idx); + + code = tBufferGet(&brinBlock->suids, idx, sizeof(record->suid), &record->suid); + if (code) return code; + code = tBufferGet(&brinBlock->uids, idx, sizeof(record->uid), &record->uid); + if (code) return code; + code = tBufferGet(&brinBlock->firstKeyTimestamps, idx, sizeof(record->firstKey.key.ts), &record->firstKey.key.ts); + if (code) return code; + code = tBufferGet(&brinBlock->firstKeyVersions, idx, sizeof(record->firstKey.version), &record->firstKey.version); + if (code) return code; + for (record->firstKey.key.numOfPKs = 0; record->firstKey.key.numOfPKs < brinBlock->numOfPKs; + record->firstKey.key.numOfPKs++) { + code = tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx, + &record->firstKey.key.pks[record->firstKey.key.numOfPKs]); + if (code) return code; } + code = tBufferGet(&brinBlock->lastKeyTimestamps, idx, sizeof(record->lastKey.key.ts), &record->lastKey.key.ts); + if (code) return code; + code = tBufferGet(&brinBlock->lastKeyVersions, idx, sizeof(record->lastKey.version), &record->lastKey.version); + if (code) return code; + for (record->lastKey.key.numOfPKs = 0; record->lastKey.key.numOfPKs < brinBlock->numOfPKs; + record->lastKey.key.numOfPKs++) { + code = tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx, + &record->lastKey.key.pks[record->lastKey.key.numOfPKs]); + if (code) return code; + } + code = tBufferGet(&brinBlock->minVers, idx, sizeof(record->minVer), &record->minVer); + if (code) return code; + code = tBufferGet(&brinBlock->maxVers, idx, sizeof(record->maxVer), &record->maxVer); + if (code) return code; + code = tBufferGet(&brinBlock->blockOffsets, idx, sizeof(record->blockOffset), &record->blockOffset); + if (code) return code; + code = tBufferGet(&brinBlock->smaOffsets, idx, sizeof(record->smaOffset), &record->smaOffset); + if (code) return code; + code = tBufferGet(&brinBlock->blockSizes, idx, sizeof(record->blockSize), &record->blockSize); + if (code) return code; + code = tBufferGet(&brinBlock->blockKeySizes, idx, sizeof(record->blockKeySize), &record->blockKeySize); + if (code) return code; + code = tBufferGet(&brinBlock->smaSizes, idx, sizeof(record->smaSize), &record->smaSize); + if (code) return code; + code = tBufferGet(&brinBlock->numRows, idx, sizeof(record->numRow), &record->numRow); + if (code) return code; + code = tBufferGet(&brinBlock->counts, idx, sizeof(record->count), &record->count); + if (code) return code; + + return 0; +} + +int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) { + int32_t code; + SBuffer *helperBuffer = NULL; // TODO + + brinBlk->dp[0].size = 0; + brinBlk->numRec = brinBlock->numOfRecords; + brinBlk->numOfPKs = brinBlock->numOfPKs; + + // minTbid + code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid); + if (code) return code; + code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid); + if (code) return code; + // maxTbid + code = + tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid), &brinBlk->maxTbid.suid); + if (code) return code; + code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid), &brinBlk->maxTbid.uid); + if (code) return code; + // minVer and maxVer + const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers); + const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers); + brinBlk->minVer = minVers[0]; + brinBlk->maxVer = maxVers[0]; + for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) { + if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i]; + if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i]; + } + + // compress data + for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { + SBuffer *bf = &brinBlock->buffers[i]; + SCompressInfo info = { + .cmprAlg = brinBlk->cmprAlg, + }; + + if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { + info.dataType = TSDB_DATA_TYPE_BIGINT; + } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) { + info.dataType = TSDB_DATA_TYPE_INT; + } else { + ASSERT(0); + } + + code = tCompressData(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer); + if (code) return code; + brinBlk->size[i] = info.compressedSize; + brinBlk->dp[0].size += info.compressedSize; + } + + // encode primary keys + SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PRIMARY_KEY_COL]; + SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PRIMARY_KEY_COL]; + + for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { + SValueColumn *vc = &brinBlock->firstKeyPKs[i]; + firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; + code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer); + if (code) return code; + } + + for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { + SValueColumn *vc = &brinBlock->lastKeyPKs[i]; + lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; + code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer); + if (code) return code; + } + + return 0; +} + +int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) { + // if (brinBlk->fmtVersion == 0) { + // return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock); + // } else if (brinBlk->fmtVersion == 1) { + // return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock); + // } else { + // ASSERT(0); + // } return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index fa06368341..0b2d23f593 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -112,52 +112,47 @@ int32_t tStatisBlockPut(STbStatisBlock *statisBlock, const STbStatisRecord *reco int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record); // SBrinRecord ---------- -typedef union { - struct { - int64_t dataArr1[10]; - int32_t dataArr2[5]; - }; - struct { - int64_t suid; - int64_t uid; - int64_t firstKey; - int64_t firstKeyVer; - int64_t lastKey; - int64_t lastKeyVer; - int64_t minVer; - int64_t maxVer; - int64_t blockOffset; - int64_t smaOffset; - int32_t blockSize; - int32_t blockKeySize; - int32_t smaSize; - int32_t numRow; - int32_t count; - }; +typedef struct { + int64_t suid; + int64_t uid; + STsdbRowKey firstKey; + STsdbRowKey lastKey; + int64_t minVer; + int64_t maxVer; + int64_t blockOffset; + int64_t smaOffset; + int32_t blockSize; + int32_t blockKeySize; + int32_t smaSize; + int32_t numRow; + int32_t count; } SBrinRecord; -typedef union { - struct { - TARRAY2(int64_t) dataArr1[10]; - TARRAY2(int32_t) dataArr2[5]; - }; - struct { - TARRAY2(int64_t) suid[1]; - TARRAY2(int64_t) uid[1]; - TARRAY2(int64_t) firstKey[1]; - TARRAY2(int64_t) firstKeyVer[1]; - TARRAY2(int64_t) lastKey[1]; - TARRAY2(int64_t) lastKeyVer[1]; - TARRAY2(int64_t) minVer[1]; - TARRAY2(int64_t) maxVer[1]; - TARRAY2(int64_t) blockOffset[1]; - TARRAY2(int64_t) smaOffset[1]; - TARRAY2(int32_t) blockSize[1]; - TARRAY2(int32_t) blockKeySize[1]; - TARRAY2(int32_t) smaSize[1]; - TARRAY2(int32_t) numRow[1]; - TARRAY2(int32_t) count[1]; +typedef struct { + int8_t numOfPKs; + int32_t numOfRecords; + union { + SBuffer buffers[15]; + struct { + SBuffer suids; + SBuffer uids; + SBuffer firstKeyTimestamps; + SBuffer firstKeyVersions; + SBuffer lastKeyTimestamps; + SBuffer lastKeyVersions; + SBuffer minVers; + SBuffer maxVers; + SBuffer blockOffsets; + SBuffer smaOffsets; + SBuffer blockSizes; + SBuffer blockKeySizes; + SBuffer smaSizes; + SBuffer numRows; + SBuffer counts; + }; }; + SValueColumn firstKeyPKs[TD_MAX_PRIMARY_KEY_COL]; + SValueColumn lastKeyPKs[TD_MAX_PRIMARY_KEY_COL]; } SBrinBlock; typedef struct { @@ -169,18 +164,21 @@ typedef struct { int32_t numRec; int32_t size[15]; int8_t cmprAlg; - int8_t rsvd[7]; + int8_t numOfPKs; // number of primary keys + int8_t rsvd[6]; } SBrinBlk; typedef TARRAY2(SBrinBlk) TBrinBlkArray; -#define BRIN_BLOCK_SIZE(db) TARRAY2_SIZE((db)->suid) +#define BRIN_BLOCK_SIZE(db) ((db)->numOfRecords) int32_t tBrinBlockInit(SBrinBlock *brinBlock); int32_t tBrinBlockDestroy(SBrinBlock *brinBlock); int32_t tBrinBlockClear(SBrinBlock *brinBlock); int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record); int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record); +int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer); +int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock); // other apis int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);