diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index bfb52ec509..cddb8820d7 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -142,8 +142,8 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2); #define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2)) #define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2)) // SBlockCol -int32_t tPutBlockCol(uint8_t *p, void *ph); -int32_t tGetBlockCol(uint8_t *p, void *ph); +int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol); +int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol); int32_t tBlockColCmprFn(const void *p1, const void *p2); // SDataBlk void tDataBlkReset(SDataBlk *pBlock); @@ -175,15 +175,14 @@ int32_t tBlockDataUpdateRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTS int32_t tBlockDataTryUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, int64_t uid); int32_t tBlockDataUpsertRow(SBlockData *pBlockData, TSDBROW *pRow, STSchema *pTSchema, int64_t uid); void tBlockDataClear(SBlockData *pBlockData); -int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[], - int32_t aBufN[]); -int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uint8_t *aBuf[]); +int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist); +int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *assist); SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid); int32_t tBlockDataAddColData(SBlockData *pBlockData, int16_t cid, int8_t type, int8_t cflag, SColData **ppColData); // SDiskDataHdr -int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr); -int32_t tGetDiskDataHdr(uint8_t *p, void *ph); +int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr); +int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr); // SDelIdx int32_t tPutDelIdx(uint8_t *p, void *ph); int32_t tGetDelIdx(uint8_t *p, void *ph); @@ -209,8 +208,8 @@ int32_t tsdbKeyFid(TSKEY key, int32_t minutes, int8_t precision); void tsdbFidKeyRange(int32_t fid, int32_t minutes, int8_t precision, TSKEY *minKey, TSKEY *maxKey); int32_t tsdbFidLevel(int32_t fid, STsdbKeepCfg *pKeepCfg, int64_t nowSec); int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SArray *aSkyline); -int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); -int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg); +int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg); +int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg); int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, int32_t *szOut, uint8_t **ppBuf); int32_t tsdbDecmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t szOut, @@ -269,11 +268,6 @@ int32_t tsdbDataFReaderClose(SDataFReader **ppReader); int32_t tsdbReadBlockIdx(SDataFReader *pReader, SArray *aBlockIdx); int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *mDataBlk); int32_t tsdbReadSttBlk(SDataFReader *pReader, int32_t iStt, SArray *aSttBlk); -int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pBlock, SArray *aColumnDataAgg); -int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pBlock, SBlockData *pBlockData); -int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData); -int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData); -int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData); // SDelFReader int32_t tsdbDelFReaderOpen(SDelFReader **ppReader, SDelFile *pFile, STsdb *pTsdb); int32_t tsdbDelFReaderClose(SDelFReader **ppReader); @@ -561,7 +555,10 @@ struct SDiskDataHdr { int32_t szBlkCol; int32_t nRow; int8_t cmprAlg; - int8_t numOfPKs; + + // fmtVer == 1 + int8_t numOfPKs; + SBlockCol primaryBlockCols[TD_MAX_PK_COLS]; }; struct SDelFile { diff --git a/source/dnode/vnode/src/tsdb/tsdbCache.c b/source/dnode/vnode/src/tsdb/tsdbCache.c index 2b19c79c6c..5c6c6c3e9d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCache.c +++ b/source/dnode/vnode/src/tsdb/tsdbCache.c @@ -2175,7 +2175,7 @@ static int32_t loadTombFromBlk(const TTombBlkArray *pTombBlkArray, SCacheRowsRea STombRecord record = {0}; bool finished = false; - for (int32_t k = 0; k < TARRAY2_SIZE(block.suid); ++k) { + for (int32_t k = 0; k < TOMB_BLOCK_SIZE(&block); ++k) { code = tTombBlockGet(&block, k, &record); if (code != TSDB_CODE_SUCCESS) { finished = true; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index be9188480f..1d22d0fe68 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -326,7 +326,8 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe // SDiskDataHdr SBufferReader br = BUFFER_READER_INITIALIZER(0, &reader->buffers[0]); - br.offset += tGetDiskDataHdr((uint8_t *)BR_PTR(&br), &hdr); + code = tGetDiskDataHdr(&br, &hdr); + TSDB_CHECK_CODE(code, lino, _exit); tBlockDataReset(bData); bData->suid = hdr.suid; @@ -368,7 +369,8 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe break; } - br.offset += tGetBlockCol((uint8_t *)BR_PTR(&br), &blockCol); + code = tGetBlockCol(&br, &blockCol); + TSDB_CHECK_CODE(code, lino, _exit); } if (cid < blockCol.cid) { @@ -412,7 +414,8 @@ int32_t tsdbDataFileReadBlockSma(SDataFileReader *reader, const SBrinRecord *rec while (br.offset < record->smaSize) { SColumnDataAgg sma[1]; - br.offset += tGetColumnDataAgg((uint8_t *)BR_PTR(&br), sma); + code = tGetColumnDataAgg(&br, sma); + TSDB_CHECK_CODE(code, lino, _exit); code = TARRAY2_APPEND_PTR(columnDataAggArray, sma); TSDB_CHECK_CODE(code, lino, _exit); @@ -705,32 +708,35 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl int32_t code; SBrinBlk brinBlk = { + .dp[0] = + { + .offset = *fileSize, + .size = 0, + }, .numRec = brinBlock->numOfRecords, .numOfPKs = brinBlock->numOfPKs, .cmprAlg = cmprAlg, }; + for (int i = 0; i < brinBlock->numOfRecords; i++) { + SBrinRecord record; - brinBlk.dp->offset = *fileSize; - brinBlk.dp->size = 0; - - // minTbid - code = tBufferGet(&brinBlock->suids, 0, sizeof(int64_t), &brinBlk.minTbid.suid); - if (code) return code; - code = tBufferGet(&brinBlock->uids, 0, sizeof(int64_t), &brinBlk.minTbid.uid); - if (code) return code; - // maxTbid - code = tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.suid); - if (code) return code; - code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(int64_t), &brinBlk.maxTbid.uid); - if (code) return code; - // minVer and maxVer - const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers); - const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers); - brinBlk.minVer = minVers[0]; - brinBlk.maxVer = maxVers[0]; - for (int32_t i = 1; i < brinBlock->numOfRecords; i++) { - brinBlk.minVer = TMIN(brinBlk.minVer, minVers[i]); - brinBlk.maxVer = TMAX(brinBlk.maxVer, maxVers[i]); + tBrinBlockGet(brinBlock, i, &record); + if (i == 0) { + brinBlk.minTbid.suid = record.suid; + brinBlk.minTbid.uid = record.uid; + brinBlk.minVer = record.minVer; + brinBlk.maxVer = record.maxVer; + } + if (i == brinBlock->numOfRecords - 1) { + brinBlk.maxTbid.suid = record.suid; + brinBlk.maxTbid.uid = record.uid; + } + if (record.minVer < brinBlk.minVer) { + brinBlk.minVer = record.minVer; + } + if (record.maxVer > brinBlk.maxVer) { + brinBlk.maxVer = record.maxVer; + } } tsdbWriterUpdVerRange(range, brinBlk.minVer, brinBlk.maxVer); @@ -764,6 +770,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl // write primary keys to file if (brinBlock->numOfPKs > 0) { +#if 0 SBufferWriter writer; SValueColumnCompressInfo vcinfo = {.cmprAlg = cmprAlg}; @@ -798,7 +805,7 @@ int32_t tsdbFileWriteBrinBlock(STsdbFD *fd, SBrinBlock *brinBlock, int8_t cmprAl // if (code) return code; // *fileSize += tBufferGetSize(NULL); // brinBlk->dp->size += tBufferGetSize(NULL); - tBufferWriterDestroy(writer); +#endif } // append to brinBlkArray @@ -832,8 +839,18 @@ static int32_t tsdbDataFileWriteBrinRecord(SDataFileWriter *writer, const SBrinR int32_t code = 0; int32_t lino = 0; - code = tBrinBlockPut(writer->brinBlock, record); - TSDB_CHECK_CODE(code, lino, _exit); + for (;;) { + code = tBrinBlockPut(writer->brinBlock, record); + if (code == TSDB_CODE_INVALID_PARA) { + // different records with different primary keys + code = tsdbDataFileWriteBrinBlock(writer); + TSDB_CHECK_CODE(code, lino, _exit); + continue; + } else { + TSDB_CHECK_CODE(code, lino, _exit); + } + break; + } if ((writer->brinBlock->numOfRecords) >= writer->config->maxRow) { code = tsdbDataFileWriteBrinBlock(writer); @@ -873,7 +890,7 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData tsdbRowGetKey(&tsdbRowFromBlockData(bData, bData->nRow - 1), &record->lastKey); for (int32_t i = 1; i < bData->nRow; ++i) { - if (bData->aTSKEY[i] != bData->aTSKEY[i - 1]) { + if (tsdbRowCmprFn(&tsdbRowFromBlockData(bData, i - 1), &tsdbRowFromBlockData(bData, i)) != 0) { record->count++; } if (bData->aVersion[i] < record->minVer) { @@ -887,24 +904,21 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData tsdbWriterUpdVerRange(&writer->ctx->range, record->minVer, record->maxVer); // to .data file - int32_t sizeArr[5] = {0}; - - code = tCmprBlockData(bData, writer->config->cmprAlg, NULL, NULL, writer->config->bufArr, sizeArr); + code = tBlockDataCompress(bData, writer->config->cmprAlg, writer->buffers, &writer->buffers[4]); TSDB_CHECK_CODE(code, lino, _exit); - record->blockKeySize = sizeArr[3] + sizeArr[2]; - record->blockSize = sizeArr[0] + sizeArr[1] + record->blockKeySize; + record->blockKeySize = writer->buffers[0].size + writer->buffers[1].size; + record->blockSize = record->blockKeySize + writer->buffers[2].size + writer->buffers[3].size; - for (int32_t i = 3; i >= 0; --i) { - if (sizeArr[i]) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->config->bufArr[i], - sizeArr[i]); - TSDB_CHECK_CODE(code, lino, _exit); - writer->files[TSDB_FTYPE_DATA].size += sizeArr[i]; - } + for (int i = 0; i < 4; i++) { + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_DATA], writer->files[TSDB_FTYPE_DATA].size, writer->buffers[i].data, + tBufferGetSize(&writer->buffers[i])); + TSDB_CHECK_CODE(code, lino, _exit); + writer->files[TSDB_FTYPE_DATA].size += tBufferGetSize(&writer->buffers[i]); } // to .sma file + tBufferClear(&writer->buffers[0]); for (int32_t i = 0; i < bData->nColData; ++i) { SColData *colData = bData->aColData + i; if ((colData->cflag & COL_SMA_ON) == 0 || ((colData->flag & HAS_VALUE) == 0)) continue; @@ -912,17 +926,13 @@ static int32_t tsdbDataFileDoWriteBlockData(SDataFileWriter *writer, SBlockData SColumnDataAgg sma[1] = {{.colId = colData->cid}}; tColDataCalcSMA[colData->type](colData, &sma->sum, &sma->max, &sma->min, &sma->numOfNull); - int32_t size = tPutColumnDataAgg(NULL, sma); - - code = tRealloc(&writer->config->bufArr[0], record->smaSize + size); + code = tPutColumnDataAgg(&writer->buffers[0], sma); TSDB_CHECK_CODE(code, lino, _exit); - - tPutColumnDataAgg(writer->config->bufArr[0] + record->smaSize, sma); - record->smaSize += size; } + record->smaSize = tBufferGetSize(&writer->buffers[0]); if (record->smaSize > 0) { - code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->config->bufArr[0], record->smaSize); + code = tsdbWriteFile(writer->fd[TSDB_FTYPE_SMA], record->smaOffset, writer->buffers[0].data, record->smaSize); TSDB_CHECK_CODE(code, lino, _exit); writer->files[TSDB_FTYPE_SMA].size += record->smaSize; } @@ -973,7 +983,7 @@ _exit: return code; } -static int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) { +static FORCE_INLINE int32_t tsdbRowKeyCmprNullAsLargest(const STsdbRowKey *key1, const STsdbRowKey *key2) { if (key1 == NULL) { return 1; } else if (key2 == NULL) { @@ -1100,7 +1110,7 @@ static int32_t tsdbDataFileWriteTableDataEnd(SDataFileWriter *writer) { int32_t lino = 0; if (writer->ctx->tbHasOldData) { - code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as largest key */); + code = tsdbDataFileDoWriteTableOldData(writer, NULL /* as the largest key */); TSDB_CHECK_CODE(code, lino, _exit); ASSERT(writer->ctx->tbHasOldData == false); @@ -1198,11 +1208,12 @@ int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFoote } int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range) { + TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range) { int32_t code; if (TOMB_BLOCK_SIZE(tombBlock) == 0) return 0; +#if 0 STombBlk tombBlk[1] = {{ .dp[0] = { @@ -1224,31 +1235,60 @@ int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAl .numRec = TOMB_BLOCK_SIZE(tombBlock), .cmprAlg = cmprAlg, }}; +#endif + STombBlk tombBlk = { + .dp[0] = + { + .offset = *fileSize, + .size = 0, + }, + .numRec = TOMB_BLOCK_SIZE(tombBlock), + .cmprAlg = cmprAlg, + }; + for (int i = 0; i < TOMB_BLOCK_SIZE(tombBlock); i++) { + STombRecord record; + tTombBlockGet(tombBlock, i, &record); - for (int32_t i = 1; i < TOMB_BLOCK_SIZE(tombBlock); i++) { - if (tombBlk->minVer > TARRAY2_GET(tombBlock->version, i)) { - tombBlk->minVer = TARRAY2_GET(tombBlock->version, i); + if (i == 0) { + tombBlk.minTbid.suid = record.suid; + tombBlk.minTbid.uid = record.uid; + tombBlk.minVer = record.version; + tombBlk.maxVer = record.version; } - if (tombBlk->maxVer < TARRAY2_GET(tombBlock->version, i)) { - tombBlk->maxVer = TARRAY2_GET(tombBlock->version, i); + if (i == TOMB_BLOCK_SIZE(tombBlock) - 1) { + tombBlk.maxTbid.suid = record.suid; + tombBlk.maxTbid.uid = record.uid; + } + if (record.version < tombBlk.minVer) { + tombBlk.minVer = record.version; + } + if (record.version > tombBlk.maxVer) { + tombBlk.maxVer = record.version; } } - tsdbWriterUpdVerRange(range, tombBlk->minVer, tombBlk->maxVer); + tsdbWriterUpdVerRange(range, tombBlk.minVer, tombBlk.maxVer); - for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->dataArr); i++) { - code = tsdbCmprData((uint8_t *)TARRAY2_DATA(&tombBlock->dataArr[i]), TARRAY2_DATA_LEN(&tombBlock->dataArr[i]), - TSDB_DATA_TYPE_BIGINT, tombBlk->cmprAlg, &bufArr[0], 0, &tombBlk->size[i], &bufArr[1]); + for (int32_t i = 0; i < ARRAY_SIZE(tombBlock->buffers); i++) { + tBufferClear(&buffers[0]); + + SCompressInfo cinfo = { + .cmprAlg = cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .originalSize = tombBlock->buffers[i].size, + }; + code = tCompressDataToBuffer(tombBlock->buffers[i].data, cinfo.originalSize, &cinfo, &buffers[0], &buffers[1]); if (code) return code; - code = tsdbWriteFile(fd, *fileSize, bufArr[0], tombBlk->size[i]); + code = tsdbWriteFile(fd, *fileSize, buffers[0].data, buffers[0].size); if (code) return code; - tombBlk->dp->size += tombBlk->size[i]; - *fileSize += tombBlk->size[i]; + tombBlk.size[i] = cinfo.compressedSize; + tombBlk.dp->size += tombBlk.size[i]; + *fileSize += tombBlk.size[i]; } - code = TARRAY2_APPEND_PTR(tombBlkArray, tombBlk); + code = TARRAY2_APPEND_PTR(tombBlkArray, &tombBlk); if (code) return code; tTombBlockClear(tombBlock); @@ -1276,7 +1316,7 @@ static int32_t tsdbDataFileDoWriteTombBlock(SDataFileWriter *writer) { int32_t lino = 0; code = tsdbFileWriteTombBlock(writer->fd[TSDB_FTYPE_TOMB], writer->tombBlock, writer->config->cmprAlg, - &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->config->bufArr, + &writer->files[TSDB_FTYPE_TOMB].size, writer->tombBlkArray, writer->buffers, &writer->ctx->tombRange); TSDB_CHECK_CODE(code, lino, _exit); diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h index c2b9aa5260..a9bfa25f8d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.h @@ -102,7 +102,7 @@ int32_t tsdbFileWriteHeadFooter(STsdbFD *fd, int64_t *fileSize, const SHeadFoote // tomb int32_t tsdbDataFileWriteTombRecord(SDataFileWriter *writer, const STombRecord *record); int32_t tsdbFileWriteTombBlock(STsdbFD *fd, STombBlock *tombBlock, int8_t cmprAlg, int64_t *fileSize, - TTombBlkArray *tombBlkArray, uint8_t **bufArr, SVersionRange *range); + TTombBlkArray *tombBlkArray, SBuffer *buffers, SVersionRange *range); int32_t tsdbFileWriteTombBlk(STsdbFD *fd, const TTombBlkArray *tombBlkArray, SFDataPtr *ptr, int64_t *fileSize); int32_t tsdbFileWriteTombFooter(STsdbFD *fd, const STombFooter *footer, int64_t *fileSize); diff --git a/source/dnode/vnode/src/tsdb/tsdbIter.c b/source/dnode/vnode/src/tsdb/tsdbIter.c index cbef113e4b..42afe1cdc0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbIter.c +++ b/source/dnode/vnode/src/tsdb/tsdbIter.c @@ -254,9 +254,7 @@ _exit: static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) { while (!iter->noMoreData) { for (; iter->dataTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->dataTomb->tombBlock); iter->dataTomb->tombBlockIdx++) { - iter->record->suid = TARRAY2_GET(iter->dataTomb->tombBlock->suid, iter->dataTomb->tombBlockIdx); - iter->record->uid = TARRAY2_GET(iter->dataTomb->tombBlock->uid, iter->dataTomb->tombBlockIdx); - iter->record->version = TARRAY2_GET(iter->dataTomb->tombBlock->version, iter->dataTomb->tombBlockIdx); + tTombBlockGet(iter->dataTomb->tombBlock, iter->dataTomb->tombBlockIdx, iter->record); if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) { continue; @@ -265,9 +263,6 @@ static int32_t tsdbDataTombIterNext(STsdbIter *iter, const TABLEID *tbid) { if (tbid && iter->record->suid == tbid->suid && iter->record->uid == tbid->uid) { continue; } - - iter->record->skey = TARRAY2_GET(iter->dataTomb->tombBlock->skey, iter->dataTomb->tombBlockIdx); - iter->record->ekey = TARRAY2_GET(iter->dataTomb->tombBlock->ekey, iter->dataTomb->tombBlockIdx); iter->dataTomb->tombBlockIdx++; goto _exit; } @@ -444,9 +439,7 @@ static int32_t tsdbMemTableIterClose(STsdbIter *iter) { return 0; } static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) { while (!iter->noMoreData) { for (; iter->sttTomb->tombBlockIdx < TOMB_BLOCK_SIZE(iter->sttTomb->tombBlock); iter->sttTomb->tombBlockIdx++) { - iter->record->suid = TARRAY2_GET(iter->sttTomb->tombBlock->suid, iter->sttTomb->tombBlockIdx); - iter->record->uid = TARRAY2_GET(iter->sttTomb->tombBlock->uid, iter->sttTomb->tombBlockIdx); - iter->record->version = TARRAY2_GET(iter->sttTomb->tombBlock->version, iter->sttTomb->tombBlockIdx); + tTombBlockGet(iter->sttTomb->tombBlock, iter->sttTomb->tombBlockIdx, iter->record); if (iter->filterByVersion && (iter->record->version < iter->range[0] || iter->record->version > iter->range[1])) { continue; @@ -456,8 +449,6 @@ static int32_t tsdbSttTombIterNext(STsdbIter *iter, const TABLEID *tbid) { continue; } - iter->record->skey = TARRAY2_GET(iter->sttTomb->tombBlock->skey, iter->sttTomb->tombBlockIdx); - iter->record->ekey = TARRAY2_GET(iter->sttTomb->tombBlock->ekey, iter->sttTomb->tombBlockIdx); iter->sttTomb->tombBlockIdx++; goto _exit; } diff --git a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c index 95ac0f0133..4c742c0572 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadUtil.c @@ -14,7 +14,6 @@ */ #include "tsdbReadUtil.h" -#include "osDef.h" #include "tsdb.h" #include "tsdbDataFileRW.h" #include "tsdbFS2.h" @@ -529,7 +528,7 @@ static int32_t doCheckTombBlock(STombBlock* pBlock, STsdbReader* pReader, int32_ pScanInfo->pFileDelData = taosArrayInit(4, sizeof(SDelData)); } - for (int32_t k = 0; k < TARRAY2_SIZE(pBlock->suid); ++k) { + for (int32_t k = 0; k < pBlock->numOfRecords; ++k) { code = tTombBlockGet(pBlock, k, &record); if (code != TSDB_CODE_SUCCESS) { *pRet = BLK_CHECK_QUIT; diff --git a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c index 1c6750173c..76a8ee0e2a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c +++ b/source/dnode/vnode/src/tsdb/tsdbReaderWriter.c @@ -632,235 +632,6 @@ _err: return code; } -int32_t tsdbReadBlockSma(SDataFReader *pReader, SDataBlk *pDataBlk, SArray *aColumnDataAgg) { - int32_t code = 0; - SSmaInfo *pSmaInfo = &pDataBlk->smaInfo; - - ASSERT(pSmaInfo->size > 0); - - taosArrayClear(aColumnDataAgg); - - // alloc - code = tRealloc(&pReader->aBuf[0], pSmaInfo->size); - if (code) goto _err; - - // read - code = tsdbReadFile(pReader->pSmaFD, pSmaInfo->offset, pReader->aBuf[0], pSmaInfo->size, 0); - if (code) goto _err; - - // decode - int32_t n = 0; - while (n < pSmaInfo->size) { - SColumnDataAgg sma; - n += tGetColumnDataAgg(pReader->aBuf[0] + n, &sma); - - if (taosArrayPush(aColumnDataAgg, &sma) == NULL) { - code = TSDB_CODE_OUT_OF_MEMORY; - goto _err; - } - } - ASSERT(n == pSmaInfo->size); - return code; - -_err: - tsdbError("vgId:%d, tsdb read block sma failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} - -static int32_t tsdbReadBlockDataImpl(SDataFReader *pReader, SBlockInfo *pBlkInfo, SBlockData *pBlockData, - int32_t iStt) { - int32_t code = 0; - - tBlockDataClear(pBlockData); - - STsdbFD *pFD = (iStt < 0) ? pReader->pDataFD : pReader->aSttFD[iStt]; - - // uid + version + tskey - code = tRealloc(&pReader->aBuf[0], pBlkInfo->szKey); - if (code) goto _err; - - code = tsdbReadFile(pFD, pBlkInfo->offset, pReader->aBuf[0], pBlkInfo->szKey, 0); - if (code) goto _err; - - SDiskDataHdr hdr; - uint8_t *p = pReader->aBuf[0] + tGetDiskDataHdr(pReader->aBuf[0], &hdr); - - ASSERT(hdr.delimiter == TSDB_FILE_DLMT); - ASSERT(pBlockData->suid == hdr.suid); - - pBlockData->uid = hdr.uid; - pBlockData->nRow = hdr.nRow; - - // uid - if (hdr.uid == 0) { - ASSERT(hdr.szUid); - code = tsdbDecmprData(p, hdr.szUid, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aUid, - sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]); - if (code) goto _err; - } else { - ASSERT(!hdr.szUid); - } - p += hdr.szUid; - - // version - code = tsdbDecmprData(p, hdr.szVer, TSDB_DATA_TYPE_BIGINT, hdr.cmprAlg, (uint8_t **)&pBlockData->aVersion, - sizeof(int64_t) * hdr.nRow, &pReader->aBuf[1]); - if (code) goto _err; - p += hdr.szVer; - - // TSKEY - code = tsdbDecmprData(p, hdr.szKey, TSDB_DATA_TYPE_TIMESTAMP, hdr.cmprAlg, (uint8_t **)&pBlockData->aTSKEY, - sizeof(TSKEY) * hdr.nRow, &pReader->aBuf[1]); - if (code) goto _err; - p += hdr.szKey; - - ASSERT(p - pReader->aBuf[0] == pBlkInfo->szKey); - - // read and decode columns - if (pBlockData->nColData == 0) goto _exit; - - if (hdr.szBlkCol > 0) { - int64_t offset = pBlkInfo->offset + pBlkInfo->szKey; - - code = tRealloc(&pReader->aBuf[0], hdr.szBlkCol); - if (code) goto _err; - - code = tsdbReadFile(pFD, offset, pReader->aBuf[0], hdr.szBlkCol, 0); - if (code) goto _err; - } - - SBlockCol blockCol = {.cid = 0}; - SBlockCol *pBlockCol = &blockCol; - int32_t n = 0; - - for (int32_t iColData = 0; iColData < pBlockData->nColData; iColData++) { - SColData *pColData = tBlockDataGetColDataByIdx(pBlockData, iColData); - - while (pBlockCol && pBlockCol->cid < pColData->cid) { - if (n < hdr.szBlkCol) { - n += tGetBlockCol(pReader->aBuf[0] + n, pBlockCol); - } else { - ASSERT(n == hdr.szBlkCol); - pBlockCol = NULL; - } - } - - if (pBlockCol == NULL || pBlockCol->cid > pColData->cid) { - // add a lot of NONE - for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NONE(pColData->cid, pColData->type)); - if (code) goto _err; - } - } else { - ASSERT(pBlockCol->type == pColData->type); - ASSERT(pBlockCol->flag && pBlockCol->flag != HAS_NONE); - - if (pBlockCol->flag == HAS_NULL) { - // add a lot of NULL - for (int32_t iRow = 0; iRow < hdr.nRow; iRow++) { - code = tColDataAppendValue(pColData, &COL_VAL_NULL(pBlockCol->cid, pBlockCol->type)); - if (code) goto _err; - } - } else { - // decode from binary - int64_t offset = pBlkInfo->offset + pBlkInfo->szKey + hdr.szBlkCol + pBlockCol->offset; - int32_t size = pBlockCol->szBitmap + pBlockCol->szOffset + pBlockCol->szValue; - - code = tRealloc(&pReader->aBuf[1], size); - if (code) goto _err; - - code = tsdbReadFile(pFD, offset, pReader->aBuf[1], size, 0); - if (code) goto _err; - - code = tsdbDecmprColData(pReader->aBuf[1], pBlockCol, hdr.cmprAlg, hdr.nRow, pColData, &pReader->aBuf[2]); - if (code) goto _err; - } - } - } - -_exit: - return code; - -_err: - tsdbError("vgId:%d, tsdb read block data impl failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbReadDataBlockEx(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { - int32_t code = 0; - SBlockInfo *pBlockInfo = &pDataBlk->aSubBlock[0]; - - // alloc - code = tRealloc(&pReader->aBuf[0], pBlockInfo->szBlock); - if (code) goto _err; - - // read - code = tsdbReadFile(pReader->pDataFD, pBlockInfo->offset, pReader->aBuf[0], pBlockInfo->szBlock, 0); - if (code) goto _err; - - // decmpr - code = tDecmprBlockData(pReader->aBuf[0], pBlockInfo->szBlock, pBlockData, &pReader->aBuf[1]); - if (code) goto _err; - - return code; - -_err: - tsdbError("vgId:%d, tsdb read data block ex failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbReadDataBlock(SDataFReader *pReader, SDataBlk *pDataBlk, SBlockData *pBlockData) { - int32_t code = 0; - - code = tsdbReadBlockDataImpl(pReader, &pDataBlk->aSubBlock[0], pBlockData, -1); - if (code) goto _err; - - ASSERT(pDataBlk->nSubBlock == 1); - - return code; - -_err: - tsdbError("vgId:%d, tsdb read data block failed since %s", TD_VID(pReader->pTsdb->pVnode), tstrerror(code)); - return code; -} - -int32_t tsdbReadSttBlock(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) { - int32_t code = 0; - int32_t lino = 0; - - code = tsdbReadBlockDataImpl(pReader, &pSttBlk->bInfo, pBlockData, iStt); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - -int32_t tsdbReadSttBlockEx(SDataFReader *pReader, int32_t iStt, SSttBlk *pSttBlk, SBlockData *pBlockData) { - int32_t code = 0; - int32_t lino = 0; - - // alloc - code = tRealloc(&pReader->aBuf[0], pSttBlk->bInfo.szBlock); - TSDB_CHECK_CODE(code, lino, _exit); - - // read - code = tsdbReadFile(pReader->aSttFD[iStt], pSttBlk->bInfo.offset, pReader->aBuf[0], pSttBlk->bInfo.szBlock, 0); - TSDB_CHECK_CODE(code, lino, _exit); - - // decmpr - code = tDecmprBlockData(pReader->aBuf[0], pSttBlk->bInfo.szBlock, pBlockData, &pReader->aBuf[1]); - TSDB_CHECK_CODE(code, lino, _exit); - -_exit: - if (code) { - tsdbError("vgId:%d, %s failed at %d since %s", TD_VID(pReader->pTsdb->pVnode), __func__, lino, tstrerror(code)); - } - return code; -} - // SDelFReader ==================================================== struct SDelFReader { STsdb *pTsdb; diff --git a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c index 4710337e80..10c6062155 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUpgrade.c +++ b/source/dnode/vnode/src/tsdb/tsdbUpgrade.c @@ -449,12 +449,12 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * struct { // context - bool toStt; - int8_t cmprAlg; - int32_t maxRow; - int64_t minKey; - int64_t maxKey; - uint8_t *bufArr[8]; + bool toStt; + int8_t cmprAlg; + int32_t maxRow; + int64_t minKey; + int64_t maxKey; + SBuffer buffers[8]; // reader SArray *aDelData; // writer @@ -503,7 +503,7 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * } SVersionRange tombRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, - ctx->bufArr, &tombRange); + ctx->buffers, &tombRange); TSDB_CHECK_CODE(code, lino, _exit); } } @@ -516,7 +516,7 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray * } SVersionRange tombRange = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN}; code = tsdbFileWriteTombBlock(ctx->fd, ctx->tombBlock, ctx->cmprAlg, &ctx->fobj->f->size, ctx->tombBlkArray, - ctx->bufArr, &tombRange); + ctx->buffers, &tombRange); TSDB_CHECK_CODE(code, lino, _exit); } @@ -545,8 +545,8 @@ _exit: if (code) { TSDB_ERROR_LOG(TD_VID(tsdb->pVnode), lino, code); } - for (int32_t i = 0; i < ARRAY_SIZE(ctx->bufArr); i++) { - tFree(ctx->bufArr[i]); + for (int32_t i = 0; i < ARRAY_SIZE(ctx->buffers); i++) { + tBufferDestroy(ctx->buffers + i); } TARRAY2_DESTROY(ctx->tombBlkArray, NULL); tTombBlockDestroy(ctx->tombBlock); diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index 71b70c0cb3..8643adc269 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -380,47 +380,44 @@ int32_t tGetSttBlk(uint8_t *p, void *ph) { } // SBlockCol ====================================================== -int32_t tPutBlockCol(uint8_t *p, void *ph) { - int32_t n = 0; - SBlockCol *pBlockCol = (SBlockCol *)ph; +int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol) { + int32_t code; ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); - n += tPutI16v(p ? p + n : p, pBlockCol->cid); - n += tPutI8(p ? p + n : p, pBlockCol->type); - n += tPutI8(p ? p + n : p, pBlockCol->cflag); - n += tPutI8(p ? p + n : p, pBlockCol->flag); - n += tPutI32v(p ? p + n : p, pBlockCol->szOrigin); + if ((code = tBufferPutI16v(buffer, pBlockCol->cid))) return code; + if ((code = tBufferPutI8(buffer, pBlockCol->type))) return code; + if ((code = tBufferPutI8(buffer, pBlockCol->cflag))) return code; + if ((code = tBufferPutI8(buffer, pBlockCol->flag))) return code; + if ((code = tBufferPutI32v(buffer, pBlockCol->szOrigin))) return code; if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_VALUE) { - n += tPutI32v(p ? p + n : p, pBlockCol->szBitmap); + if ((code = tBufferPutI32v(buffer, pBlockCol->szBitmap))) return code; } if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - n += tPutI32v(p ? p + n : p, pBlockCol->szOffset); + if ((code = tBufferPutI32v(buffer, pBlockCol->szOffset))) return code; } if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { - n += tPutI32v(p ? p + n : p, pBlockCol->szValue); + if ((code = tBufferPutI32v(buffer, pBlockCol->szValue))) return code; } - n += tPutI32v(p ? p + n : p, pBlockCol->offset); + if ((code = tBufferPutI32v(buffer, pBlockCol->offset))) return code; } -_exit: - return n; + return 0; } -int32_t tGetBlockCol(uint8_t *p, void *ph) { - int32_t n = 0; - SBlockCol *pBlockCol = (SBlockCol *)ph; +int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol) { + int32_t code; - n += tGetI16v(p + n, &pBlockCol->cid); - n += tGetI8(p + n, &pBlockCol->type); - n += tGetI8(p + n, &pBlockCol->cflag); - n += tGetI8(p + n, &pBlockCol->flag); - n += tGetI32v(p + n, &pBlockCol->szOrigin); + if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code; + if ((code = tBufferGetI8(br, &pBlockCol->type))) return code; + if ((code = tBufferGetI8(br, &pBlockCol->cflag))) return code; + if ((code = tBufferGetI8(br, &pBlockCol->flag))) return code; + if ((code = tBufferGetI32v(br, &pBlockCol->szOrigin))) return code; ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); @@ -431,21 +428,21 @@ int32_t tGetBlockCol(uint8_t *p, void *ph) { if (pBlockCol->flag != HAS_NULL) { if (pBlockCol->flag != HAS_VALUE) { - n += tGetI32v(p + n, &pBlockCol->szBitmap); + if ((code = tBufferGetI32v(br, &pBlockCol->szBitmap))) return code; } if (IS_VAR_DATA_TYPE(pBlockCol->type)) { - n += tGetI32v(p + n, &pBlockCol->szOffset); + if ((code = tBufferGetI32v(br, &pBlockCol->szOffset))) return code; } if (pBlockCol->flag != (HAS_NULL | HAS_NONE)) { - n += tGetI32v(p + n, &pBlockCol->szValue); + if ((code = tBufferGetI32v(br, &pBlockCol->szValue))) return code; } - n += tGetI32v(p + n, &pBlockCol->offset); + if ((code = tBufferGetI32v(br, &pBlockCol->offset))) return code; } - return n; + return 0; } #ifdef BUILD_NO_CALL @@ -1381,6 +1378,7 @@ SColData *tBlockDataGetColData(SBlockData *pBlockData, int16_t cid) { return NULL; } +#if 0 int32_t tCmprBlockData(SBlockData *pBlockData, int8_t cmprAlg, uint8_t **ppOut, int32_t *szOut, uint8_t *aBuf[], int32_t aBufSize[]) { int32_t code = 0; @@ -1590,15 +1588,148 @@ int32_t tDecmprBlockData(uint8_t *pIn, int32_t szIn, SBlockData *pBlockData, uin _exit: return code; } +#endif -int32_t tBlockDataCompress(SBlockData *bData, SBuffer *buffer, SBuffer *assist) { - int32_t code = 0; - SDiskDataHdr hdr = {0}; +static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, SBuffer *buffer, SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + SCompressInfo cinfo; - // SDiskDataHdr - // br->offset += tGetDiskDataHdr((uint8_t *)tBufferGetDataAt(br->buffer, br->offset), &hdr); + // uid + if (bData->uid == 0) { + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .originalSize = sizeof(int64_t) * bData->nRow, + }; + code = tCompressDataToBuffer(bData->aUid, cinfo.originalSize, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + hdr->szUid = cinfo.compressedSize; + } - tBlockDataReset(bData); + // version + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_BIGINT, + .originalSize = sizeof(int64_t) * bData->nRow, + }; + code = tCompressDataToBuffer((uint8_t *)bData->aVersion, cinfo.originalSize, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + hdr->szVer = cinfo.compressedSize; + + // ts + cinfo = (SCompressInfo){ + .cmprAlg = hdr->cmprAlg, + .dataType = TSDB_DATA_TYPE_TIMESTAMP, + .originalSize = sizeof(TSKEY) * bData->nRow, + }; + code = tCompressDataToBuffer((uint8_t *)bData->aTSKEY, cinfo.originalSize, &cinfo, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + hdr->szKey = cinfo.compressedSize; + + // primary keys + for (hdr->numOfPKs = 0; hdr->numOfPKs < bData->nColData; hdr->numOfPKs++) { + ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS); + + SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs]; + SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); + + if ((colData->cflag & COL_IS_KEY) == 0) { + break; + } + + SColDataCompressInfo info = { + .cmprAlg = hdr->cmprAlg, + }; + code = tColDataCompress(colData, &info, buffer, assist); + TSDB_CHECK_CODE(code, lino, _exit); + + *blockCol = (SBlockCol){ + .cid = info.columnId, + .type = info.dataType, + .cflag = info.columnFlag, + .flag = info.flag, + .szOrigin = info.dataOriginalSize, + .szBitmap = info.bitmapCompressedSize, + .szOffset = info.offsetCompressedSize, + .szValue = info.dataCompressedSize, + .offset = 0, + }; + } + +_exit: + return code; +} + +/* buffers[0]: SDiskDataHdr + * buffers[1]: key part: uid + version + ts + primary keys + * buffers[2]: SBlockCol part + * buffers[3]: regular column part + */ +int32_t tBlockDataCompress(SBlockData *bData, int8_t cmprAlg, SBuffer *buffers, SBuffer *assist) { + int32_t code = 0; + int32_t lino = 0; + + SDiskDataHdr hdr = { + .delimiter = TSDB_FILE_DLMT, + .fmtVer = 1, + .suid = bData->suid, + .uid = bData->uid, + .szUid = 0, // filled by compress key + .szVer = 0, // filled by compress key + .szKey = 0, // filled by compress key + .szBlkCol = 0, // filled by this func + .nRow = bData->nRow, + .cmprAlg = cmprAlg, + .numOfPKs = 0, // filled by compress key + }; + + // Key part + tBufferClear(&buffers[1]); + code = tBlockDataCompressKeyPart(bData, &hdr, &buffers[1], assist); + TSDB_CHECK_CODE(code, lino, _exit); + + // Regulart column part + tBufferClear(&buffers[2]); + tBufferClear(&buffers[3]); + for (int i = 0; i < bData->nColData; i++) { + SColData *colData = tBlockDataGetColDataByIdx(bData, i); + + if (colData->cflag & COL_IS_KEY) { + continue; + } + if (colData->flag == HAS_NONE) { + continue; + } + + SColDataCompressInfo cinfo = { + .cmprAlg = cmprAlg, + }; + int32_t offset = buffers[3].size; + code = tColDataCompress(colData, &cinfo, &buffers[3], assist); + TSDB_CHECK_CODE(code, lino, _exit); + + SBlockCol blockCol = (SBlockCol){ + .cid = cinfo.columnId, + .type = cinfo.dataType, + .cflag = cinfo.columnFlag, + .flag = cinfo.flag, + .szOrigin = cinfo.dataOriginalSize, + .szBitmap = cinfo.bitmapCompressedSize, + .szOffset = cinfo.offsetCompressedSize, + .szValue = cinfo.dataCompressedSize, + .offset = offset, + }; + + code = tPutBlockCol(&buffers[2], &blockCol); + TSDB_CHECK_CODE(code, lino, _exit); + } + hdr.szBlkCol = buffers[2].size; + + // SDiskDataHdr part + tBufferClear(&buffers[0]); + code = tPutDiskDataHdr(&buffers[0], &hdr); + TSDB_CHECK_CODE(code, lino, _exit); _exit: return code; @@ -1700,18 +1831,14 @@ int32_t tBlockDataDecompressKeyPart(const SDiskDataHdr *hdr, SBufferReader *br, br->offset += cinfo.compressedSize; // primary keys - if (hdr->numOfPKs > 0) { - SBlockCol blockCol; + for (int i = 0; i < hdr->numOfPKs; i++) { + const SBlockCol *blockCol = &hdr->primaryBlockCols[i]; - for (int i = 0; i < hdr->numOfPKs; i++) { - br->offset += tGetBlockCol((uint8_t *)BR_PTR(br), &blockCol); + ASSERT(blockCol->flag == HAS_VALUE); + ASSERT(blockCol->cflag & COL_IS_KEY); - ASSERT(blockCol.flag == HAS_VALUE); - ASSERT(blockCol.cflag & COL_IS_KEY); - - code = tBlockDataDecompressColData(hdr, &blockCol, br, blockData, assist); - TSDB_CHECK_CODE(code, lino, _exit); - } + code = tBlockDataDecompressColData(hdr, blockCol, br, blockData, assist); + TSDB_CHECK_CODE(code, lino, _exit); } _exit: @@ -1725,7 +1852,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * SCompressInfo cinfo; // SDiskDataHdr - br->offset += tGetDiskDataHdr((uint8_t *)BR_PTR(br), &hdr); + code = tGetDiskDataHdr(br, &hdr); + TSDB_CHECK_CODE(code, lino, _exit); tBlockDataReset(blockData); blockData->suid = hdr.suid; @@ -1737,15 +1865,13 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * TSDB_CHECK_CODE(code, lino, _exit); // Column part - uint8_t *decodePtr = (uint8_t *)BR_PTR(br); - int32_t totalSize = 0; + SBufferReader br2 = *br; br->offset += hdr.szBlkCol; - while (totalSize < hdr.szBlkCol) { + for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) { SBlockCol blockCol; - int32_t size = tGetBlockCol(decodePtr, &blockCol); - decodePtr += size; - totalSize += size; + code = tGetBlockCol(&br2, &blockCol); + TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist); TSDB_CHECK_CODE(code, lino, _exit); } @@ -1755,72 +1881,77 @@ _exit: } // SDiskDataHdr ============================== -int32_t tPutDiskDataHdr(uint8_t *p, const SDiskDataHdr *pHdr) { - int32_t n = 0; +int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) { + int32_t code; - n += tPutU32(p ? p + n : p, pHdr->delimiter); - n += tPutU32v(p ? p + n : p, pHdr->fmtVer); - n += tPutI64(p ? p + n : p, pHdr->suid); - n += tPutI64(p ? p + n : p, pHdr->uid); - n += tPutI32v(p ? p + n : p, pHdr->szUid); - n += tPutI32v(p ? p + n : p, pHdr->szVer); - n += tPutI32v(p ? p + n : p, pHdr->szKey); - n += tPutI32v(p ? p + n : p, pHdr->szBlkCol); - n += tPutI32v(p ? p + n : p, pHdr->nRow); - n += tPutI8(p ? p + n : p, pHdr->cmprAlg); + if ((code = tBufferPutU32(buffer, pHdr->delimiter))) return code; + if ((code = tBufferPutU32v(buffer, pHdr->fmtVer))) return code; + if ((code = tBufferPutI64(buffer, pHdr->suid))) return code; + if ((code = tBufferPutI64(buffer, pHdr->uid))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szUid))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szVer))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code; + if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code; + if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code; if (pHdr->fmtVer == 1) { - n += tPutI8(p ? p + n : p, pHdr->numOfPKs); + if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code; + for (int i = 0; i < pHdr->numOfPKs; i++) { + if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i]))) return code; + } } - return n; + return 0; } -int32_t tGetDiskDataHdr(uint8_t *p, void *ph) { - int32_t n = 0; - SDiskDataHdr *pHdr = (SDiskDataHdr *)ph; +int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) { + int32_t code; - n += tGetU32(p + n, &pHdr->delimiter); - n += tGetU32v(p + n, &pHdr->fmtVer); - n += tGetI64(p + n, &pHdr->suid); - n += tGetI64(p + n, &pHdr->uid); - n += tGetI32v(p + n, &pHdr->szUid); - n += tGetI32v(p + n, &pHdr->szVer); - n += tGetI32v(p + n, &pHdr->szKey); - n += tGetI32v(p + n, &pHdr->szBlkCol); - n += tGetI32v(p + n, &pHdr->nRow); - n += tGetI8(p + n, &pHdr->cmprAlg); + if ((code = tBufferGetU32(br, &pHdr->delimiter))) return code; + if ((code = tBufferGetU32v(br, &pHdr->fmtVer))) return code; + if ((code = tBufferGetI64(br, &pHdr->suid))) return code; + if ((code = tBufferGetI64(br, &pHdr->uid))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szUid))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szVer))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code; + if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code; + if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code; + if ((code = tBufferGetI8(br, &pHdr->cmprAlg))) return code; if (pHdr->fmtVer == 1) { - n += tGetI8(p + n, &pHdr->numOfPKs); + if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code; + for (int i = 0; i < pHdr->numOfPKs; i++) { + if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i]))) return code; + } } else { pHdr->numOfPKs = 0; } - return n; + return 0; } // ALGORITHM ============================== -int32_t tPutColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) { - int32_t n = 0; +int32_t tPutColumnDataAgg(SBuffer *buffer, SColumnDataAgg *pColAgg) { + int32_t code; - n += tPutI16v(p ? p + n : p, pColAgg->colId); - n += tPutI16v(p ? p + n : p, pColAgg->numOfNull); - n += tPutI64(p ? p + n : p, pColAgg->sum); - n += tPutI64(p ? p + n : p, pColAgg->max); - n += tPutI64(p ? p + n : p, pColAgg->min); + if ((code = tBufferPutI16v(buffer, pColAgg->colId))) return code; + if ((code = tBufferPutI16v(buffer, pColAgg->numOfNull))) return code; + if ((code = tBufferPutI64(buffer, pColAgg->sum))) return code; + if ((code = tBufferPutI64(buffer, pColAgg->max))) return code; + if ((code = tBufferPutI64(buffer, pColAgg->min))) return code; - return n; + return 0; } -int32_t tGetColumnDataAgg(uint8_t *p, SColumnDataAgg *pColAgg) { - int32_t n = 0; +int32_t tGetColumnDataAgg(SBufferReader *br, SColumnDataAgg *pColAgg) { + int32_t code; - n += tGetI16v(p + n, &pColAgg->colId); - n += tGetI16v(p + n, &pColAgg->numOfNull); - n += tGetI64(p + n, &pColAgg->sum); - n += tGetI64(p + n, &pColAgg->max); - n += tGetI64(p + n, &pColAgg->min); + if ((code = tBufferGetI16v(br, &pColAgg->colId))) return code; + if ((code = tBufferGetI16v(br, &pColAgg->numOfNull))) return code; + if ((code = tBufferGetI64(br, &pColAgg->sum))) return code; + if ((code = tBufferGetI64(br, &pColAgg->max))) return code; + if ((code = tBufferGetI64(br, &pColAgg->min))) return code; - return n; + return 0; } int32_t tsdbCmprData(uint8_t *pIn, int32_t szIn, int8_t type, int8_t cmprAlg, uint8_t **ppOut, int32_t nOut, diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.c b/source/dnode/vnode/src/tsdb/tsdbUtil2.c index d137436ff1..fbb9dcb740 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.c @@ -238,6 +238,12 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) { // if the number of primary keys are not the same, // return an error code and the caller should handle it return TSDB_CODE_INVALID_PARA; + } else { + for (int i = 0; i < brinBlock->numOfPKs; i++) { + if (record->firstKey.key.pks[i].type != brinBlock->firstKeyPKs[i].type) { + return TSDB_CODE_INVALID_PARA; + } + } } code = tBufferPutI64(&brinBlock->suids, record->suid); @@ -387,89 +393,6 @@ int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) { return 0; } -// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer) { -// int32_t code; -// SBuffer *helperBuffer = NULL; // TODO - -// brinBlk->dp[0].size = 0; -// brinBlk->numRec = brinBlock->numOfRecords; -// brinBlk->numOfPKs = brinBlock->numOfPKs; - -// // minTbid -// code = tBufferGet(&brinBlock->suids, 0, sizeof(brinBlk->minTbid.suid), &brinBlk->minTbid.suid); -// if (code) return code; -// code = tBufferGet(&brinBlock->uids, 0, sizeof(brinBlk->minTbid.uid), &brinBlk->minTbid.uid); -// if (code) return code; -// // maxTbid -// code = -// tBufferGet(&brinBlock->suids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.suid), -// &brinBlk->maxTbid.suid); -// if (code) return code; -// code = tBufferGet(&brinBlock->uids, brinBlock->numOfRecords - 1, sizeof(brinBlk->maxTbid.uid), -// &brinBlk->maxTbid.uid); if (code) return code; -// // minVer and maxVer -// const int64_t *minVers = (int64_t *)tBufferGetData(&brinBlock->minVers); -// const int64_t *maxVers = (int64_t *)tBufferGetData(&brinBlock->maxVers); -// brinBlk->minVer = minVers[0]; -// brinBlk->maxVer = maxVers[0]; -// for (int32_t i = 1; i < brinBlock->numOfRecords; ++i) { -// if (minVers[i] < brinBlk->minVer) brinBlk->minVer = minVers[i]; -// if (maxVers[i] > brinBlk->maxVer) brinBlk->maxVer = maxVers[i]; -// } - -// // compress data -// for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) { -// SBuffer *bf = &brinBlock->buffers[i]; -// SCompressInfo info = { -// .cmprAlg = brinBlk->cmprAlg, -// }; - -// if (tBufferGetSize(bf) == 8 * brinBlock->numOfRecords) { -// info.dataType = TSDB_DATA_TYPE_BIGINT; -// } else if (tBufferGetSize(bf) == 4 * brinBlock->numOfRecords) { -// info.dataType = TSDB_DATA_TYPE_INT; -// } else { -// ASSERT(0); -// } - -// code = tCompressDataToBuffer(tBufferGetData(bf), tBufferGetSize(bf), &info, buffer, helperBuffer); -// if (code) return code; -// brinBlk->size[i] = info.compressedSize; -// brinBlk->dp[0].size += info.compressedSize; -// } - -// // encode primary keys -// SValueColumnCompressInfo firstKeyPKsInfos[TD_MAX_PK_COLS]; -// SValueColumnCompressInfo lastKeyPKsInfos[TD_MAX_PK_COLS]; - -// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { -// SValueColumn *vc = &brinBlock->firstKeyPKs[i]; -// firstKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; -// code = tValueColumnCompress(vc, &firstKeyPKsInfos[i], buffer, helperBuffer); -// if (code) return code; -// } - -// for (int32_t i = 0; i < brinBlk->numOfPKs; ++i) { -// SValueColumn *vc = &brinBlock->lastKeyPKs[i]; -// lastKeyPKsInfos[i].cmprAlg = brinBlk->cmprAlg; -// code = tValueColumnCompress(vc, &lastKeyPKsInfos[i], buffer, helperBuffer); -// if (code) return code; -// } - -// return 0; -// } - -// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock) { -// if (brinBlk->fmtVersion == 0) { -// return tBrinBlockDecodeVersion0(buffer, brinBlk, brinBlock); -// } else if (brinBlk->fmtVersion == 1) { -// return tBrinBlockDecodeVersion1(buffer, brinBlk, brinBlock); -// } else { -// ASSERT(0); -// } -// return 0; -// } - // other apis ---------- int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb) { if (tbid->suid) { diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil2.h b/source/dnode/vnode/src/tsdb/tsdbUtil2.h index 03253ba649..806f529423 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil2.h +++ b/source/dnode/vnode/src/tsdb/tsdbUtil2.h @@ -183,8 +183,6 @@ int32_t tBrinBlockDestroy(SBrinBlock *brinBlock); int32_t tBrinBlockClear(SBrinBlock *brinBlock); int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record); int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record); -// int32_t tBrinBlockEncode(SBrinBlock *brinBlock, SBrinBlk *brinBlk, SBuffer *buffer); -// int32_t tBrinBlockDecode(const SBuffer *buffer, SBrinBlk *brinBlk, SBrinBlock *brinBlock); // other apis int32_t tsdbUpdateSkmTb(STsdb *pTsdb, const TABLEID *tbid, SSkmInfo *pSkmTb);