From d85ae3c26eb16df442e1f4f820e7a506fe9f5e8b Mon Sep 17 00:00:00 2001 From: yihaoDeng Date: Tue, 2 Apr 2024 12:14:02 +0800 Subject: [PATCH] fix invalid free --- source/dnode/vnode/src/inc/tsdb.h | 154 +++++++++---------- source/dnode/vnode/src/tsdb/tsdbDataFileRW.c | 8 +- source/dnode/vnode/src/tsdb/tsdbSttFileRW.c | 18 +-- source/dnode/vnode/src/tsdb/tsdbUtil.c | 47 +++--- 4 files changed, 117 insertions(+), 110 deletions(-) diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index cf59cd149d..ce66ceddd6 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -146,8 +146,8 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2); #define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2)) #define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2)) // SBlockCol -int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver); -int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver); +int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t cmprAlg); +int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t cmprAlg); int32_t tBlockColCmprFn(const void *p1, const void *p2); // SDataBlk void tDataBlkReset(SDataBlk *pBlock); @@ -225,10 +225,10 @@ void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive); -SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); +SArray * tsdbMemTableGetTbDataArray(SMemTable *pMemTable); // STbDataIter int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter); -void *tsdbTbDataIterDestroy(STbDataIter *pIter); +void * tsdbTbDataIterDestroy(STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter); void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum); @@ -281,7 +281,7 @@ int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_ // tsdbMerge.c ============================================================================================== typedef struct { - STsdb *tsdb; + STsdb * tsdb; int32_t fid; } SMergeArg; @@ -312,22 +312,22 @@ int32_t tsdbDataIterNext2(STsdbDataIter2 *pIter, STsdbFilterInfo *pFilterInfo); // structs ======================= struct STsdbFS { SDelFile *pDelFile; - SArray *aDFileSet; // SArray + SArray * aDFileSet; // SArray }; typedef struct { - rocksdb_t *db; - rocksdb_comparator_t *my_comparator; - rocksdb_cache_t *blockcache; + rocksdb_t * db; + rocksdb_comparator_t * my_comparator; + rocksdb_cache_t * blockcache; rocksdb_block_based_table_options_t *tableoptions; - rocksdb_options_t *options; - rocksdb_flushoptions_t *flushoptions; - rocksdb_writeoptions_t *writeoptions; - rocksdb_readoptions_t *readoptions; - rocksdb_writebatch_t *writebatch; - rocksdb_writebatch_t *rwritebatch; + rocksdb_options_t * options; + rocksdb_flushoptions_t * flushoptions; + rocksdb_writeoptions_t * writeoptions; + rocksdb_readoptions_t * readoptions; + rocksdb_writebatch_t * writebatch; + rocksdb_writebatch_t * rwritebatch; TdThreadMutex rMutex; - STSchema *pTSchema; + STSchema * pTSchema; } SRocksCache; typedef struct { @@ -336,22 +336,22 @@ typedef struct { } SCacheFlushState; struct STsdb { - char *path; - SVnode *pVnode; + char * path; + SVnode * pVnode; STsdbKeepCfg keepCfg; TdThreadMutex mutex; bool bgTaskDisabled; - SMemTable *mem; - SMemTable *imem; + SMemTable * mem; + SMemTable * imem; STsdbFS fs; // old - SLRUCache *lruCache; + SLRUCache * lruCache; SCacheFlushState flushState; TdThreadMutex lruMutex; - SLRUCache *biCache; + SLRUCache * biCache; TdThreadMutex biMutex; - SLRUCache *bCache; + SLRUCache * bCache; TdThreadMutex bMutex; - SLRUCache *pgCache; + SLRUCache * pgCache; TdThreadMutex pgMutex; struct STFileSystem *pFS; // new SRocksCache rCache; @@ -380,17 +380,17 @@ struct STbData { TSKEY minKey; TSKEY maxKey; SRWLatch lock; - SDelData *pHead; - SDelData *pTail; + SDelData * pHead; + SDelData * pTail; SMemSkipList sl; - STbData *next; + STbData * next; SRBTreeNode rbtn[1]; }; struct SMemTable { SRWLatch latch; - STsdb *pTsdb; - SVBufPool *pPool; + STsdb * pTsdb; + SVBufPool * pPool; volatile int32_t nRef; int64_t minVer; int64_t maxVer; @@ -400,7 +400,7 @@ struct SMemTable { int64_t nDel; int32_t nTbData; int32_t nBucket; - STbData **aBucket; + STbData ** aBucket; SRBTree tbDataTree[1]; }; @@ -409,7 +409,7 @@ struct TSDBROW { union { struct { int64_t version; - SRow *pTSRow; + SRow * pTSRow; }; struct { SBlockData *pBlockData; @@ -510,9 +510,9 @@ struct SBlockData { int64_t suid; // 0 means normal table block data, otherwise child table block data int64_t uid; // 0 means block data in .last file, otherwise in .data file int32_t nRow; // number of rows - int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0) - int64_t *aVersion; // versions of each row - TSKEY *aTSKEY; // timestamp of each row + int64_t * aUid; // uids of each row, only exist in block data in .last file (uid == 0) + int64_t * aVersion; // versions of each row + TSKEY * aTSKEY; // timestamp of each row int32_t nColData; SColData *aColData; }; @@ -523,10 +523,10 @@ struct TABLEID { }; struct STbDataIter { - STbData *pTbData; + STbData * pTbData; int8_t backward; SMemSkipListNode *pNode; - TSDBROW *pRow; + TSDBROW * pRow; TSDBROW row; }; @@ -604,9 +604,9 @@ struct SDFileSet { int32_t fid; SHeadFile *pHeadF; SDataFile *pDataF; - SSmaFile *pSmaF; + SSmaFile * pSmaF; uint8_t nSttF; - SSttFile *aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE]; + SSttFile * aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE]; }; struct STSDBRowIter { @@ -622,18 +622,18 @@ struct STSDBRowIter { struct SRowMerger { STSchema *pTSchema; int64_t version; - SArray *pArray; // SArray + SArray * pArray; // SArray }; typedef struct { - char *path; + char * path; int32_t szPage; int32_t flag; TdFilePtr pFD; int64_t pgno; - uint8_t *pBuf; + uint8_t * pBuf; int64_t szFile; - STsdb *pTsdb; + STsdb * pTsdb; const char *objName; uint8_t s3File; int32_t fid; @@ -642,7 +642,7 @@ typedef struct { } STsdbFD; struct SDelFWriter { - STsdb *pTsdb; + STsdb * pTsdb; SDelFile fDel; STsdbFD *pWriteH; uint8_t *aBuf[1]; @@ -702,15 +702,15 @@ int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo); // snap read struct STsdbReadSnap { - SMemTable *pMem; - SQueryNode *pNode; - SMemTable *pIMem; - SQueryNode *pINode; + SMemTable * pMem; + SQueryNode * pNode; + SMemTable * pIMem; + SQueryNode * pINode; TFileSetArray *pfSetArray; }; struct SDataFWriter { - STsdb *pTsdb; + STsdb * pTsdb; SDFileSet wSet; STsdbFD *pHeadFD; @@ -727,13 +727,13 @@ struct SDataFWriter { }; struct SDataFReader { - STsdb *pTsdb; + STsdb * pTsdb; SDFileSet *pSet; - STsdbFD *pHeadFD; - STsdbFD *pDataFD; - STsdbFD *pSmaFD; - STsdbFD *aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE]; - uint8_t *aBuf[3]; + STsdbFD * pHeadFD; + STsdbFD * pDataFD; + STsdbFD * pSmaFD; + STsdbFD * aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE]; + uint8_t * aBuf[3]; }; // NOTE: do NOT change the order of the fields @@ -766,10 +766,10 @@ typedef struct { typedef struct SSttBlockLoadInfo { SBlockDataInfo blockData[2]; // buffered block data - SArray *aSttBlk; + SArray * aSttBlk; int32_t currentLoadBlockIndex; - STSchema *pSchema; - int16_t *colIds; + STSchema * pSchema; + int16_t * colIds; int32_t numOfCols; bool checkRemainingRow; // todo: no assign value? bool isLast; @@ -806,7 +806,7 @@ struct SDiskData { const uint8_t *pUid; const uint8_t *pVer; const uint8_t *pKey; - SArray *aDiskCol; // SArray + SArray * aDiskCol; // SArray }; struct SDiskDataBuilder { @@ -819,15 +819,15 @@ struct SDiskDataBuilder { SCompressor *pVerC; SCompressor *pKeyC; int32_t nBuilder; - SArray *aBuilder; // SArray - uint8_t *aBuf[2]; + SArray * aBuilder; // SArray + uint8_t * aBuf[2]; SDiskData dd; SBlkInfo bi; }; struct SLDataIter { SRBTreeNode node; - SSttBlk *pSttBlk; + SSttBlk * pSttBlk; int64_t cid; // for debug purpose int8_t backward; int32_t iSttBlk; @@ -836,8 +836,8 @@ struct SLDataIter { uint64_t uid; STimeWindow timeWindow; SVersionRange verRange; - SSttBlockLoadInfo *pBlockLoadInfo; - SRowKey startRowKey; // current row key + SSttBlockLoadInfo * pBlockLoadInfo; + SRowKey startRowKey; // current row key __compar_fn_t comparFn; bool ignoreEarlierTs; struct SSttFileReader *pReader; @@ -851,22 +851,22 @@ typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pS typedef struct SMergeTreeConf { int8_t backward; - STsdb *pTsdb; + STsdb * pTsdb; uint64_t suid; uint64_t uid; STimeWindow timewindow; SVersionRange verRange; bool strictTimeRange; - SArray *pSttFileBlockIterArray; - void *pCurrentFileset; - STSchema *pSchema; - int16_t *pCols; + SArray * pSttFileBlockIterArray; + void * pCurrentFileset; + STSchema * pSchema; + int16_t * pCols; int32_t numOfCols; - SRowKey *pCurRowKey; + SRowKey * pCurRowKey; _load_tomb_fn loadTombFn; __compar_fn_t comparFn; - void *pReader; - void *idstr; + void * pReader; + void * idstr; bool rspRows; // response the rows in stt-file, if possible } SMergeTreeConf; @@ -884,8 +884,8 @@ bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree); SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); -void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); -void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); +void * destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); +void * destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); // tsdbCache ============================================================================================== typedef enum { @@ -967,7 +967,7 @@ struct STsdbDataIter2 { // TSDB_DATA_FILE_DATA_ITER struct { SDataFReader *pReader; - SArray *aBlockIdx; // SArray + SArray * aBlockIdx; // SArray SMapData mDataBlk; SBlockData bData; int32_t iBlockIdx; @@ -979,7 +979,7 @@ struct STsdbDataIter2 { struct { SDataFReader *pReader; int32_t iStt; - SArray *aSttBlk; + SArray * aSttBlk; SBlockData bData; int32_t iSttBlk; int32_t iRow; @@ -987,8 +987,8 @@ struct STsdbDataIter2 { // TSDB_TOMB_FILE_DATA_ITER struct { SDelFReader *pReader; - SArray *aDelIdx; - SArray *aDelData; + SArray * aDelIdx; + SArray * aDelData; int32_t iDelIdx; int32_t iDelData; } tIter; diff --git a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c index 6a2a0dfb61..213a264bcb 100644 --- a/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbDataFileRW.c @@ -301,9 +301,9 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe int32_t lino = 0; SDiskDataHdr hdr; - SBuffer *buffer0 = reader->buffers + 0; - SBuffer *buffer1 = reader->buffers + 1; - SBuffer *assist = reader->buffers + 2; + SBuffer * buffer0 = reader->buffers + 0; + SBuffer * buffer1 = reader->buffers + 1; + SBuffer * assist = reader->buffers + 2; // load key part tBufferClear(buffer0); @@ -363,7 +363,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe break; } - code = tGetBlockCol(&br, &blockCol, hdr.fmtVer); + code = tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c index b049a1a716..e8db359aba 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c +++ b/source/dnode/vnode/src/tsdb/tsdbSttFileRW.c @@ -20,7 +20,7 @@ // SSttFReader ============================================================ struct SSttFileReader { SSttFileReaderConfig config[1]; - STsdbFD *fd; + STsdbFD * fd; SSttFooter footer[1]; struct { bool sttBlkLoaded; @@ -31,7 +31,7 @@ struct SSttFileReader { TStatisBlkArray statisBlkArray[1]; TTombBlkArray tombBlkArray[1]; SBuffer local[10]; - SBuffer *buffers; + SBuffer * buffers; }; // SSttFileReader @@ -96,7 +96,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray * ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0); int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk); - void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); + void * data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; int32_t code = @@ -124,7 +124,7 @@ int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tom ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0); int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk); - void *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size); + void * data = taosMemoryMalloc(reader->footer->tombBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; int32_t code = @@ -152,7 +152,7 @@ int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBl ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0); int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk); - void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size); + void * data = taosMemoryMalloc(reader->footer->sttBlkPtr->size); if (!data) return TSDB_CODE_OUT_OF_MEMORY; int32_t code = @@ -203,9 +203,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * int32_t lino = 0; SDiskDataHdr hdr; - SBuffer *buffer0 = reader->buffers + 0; - SBuffer *buffer1 = reader->buffers + 1; - SBuffer *assist = reader->buffers + 2; + SBuffer * buffer0 = reader->buffers + 0; + SBuffer * buffer1 = reader->buffers + 1; + SBuffer * assist = reader->buffers + 2; // load key part tBufferClear(buffer0); @@ -265,7 +265,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk * break; } - code = tGetBlockCol(&br, &blockCol, hdr.fmtVer); + code = tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } diff --git a/source/dnode/vnode/src/tsdb/tsdbUtil.c b/source/dnode/vnode/src/tsdb/tsdbUtil.c index b3a324392c..8a2a2926da 100644 --- a/source/dnode/vnode/src/tsdb/tsdbUtil.c +++ b/source/dnode/vnode/src/tsdb/tsdbUtil.c @@ -247,7 +247,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) { int32_t tCmprBlockL(void const *lhs, void const *rhs) { SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; - SSttBlk *rBlockL = (SSttBlk *)rhs; + SSttBlk * rBlockL = (SSttBlk *)rhs; if (lBlockIdx->suid < rBlockL->suid) { return -1; @@ -388,9 +388,9 @@ int32_t tGetSttBlk(uint8_t *p, void *ph) { // SBlockCol ====================================================== -static const int32_t BLOCK_WITH_ALG_VER = 1; +static const int32_t BLOCK_WITH_ALG_VER = 2; -int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver) { +int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) { int32_t code; ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); @@ -418,11 +418,13 @@ int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver) { } if (ver >= BLOCK_WITH_ALG_VER) { if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code; + } else { + if ((code = tBufferPutU32(buffer, defaultCmprAlg))) return code; } return 0; } -int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver) { +int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) { int32_t code; if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code; @@ -456,6 +458,8 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver) { if (ver >= BLOCK_WITH_ALG_VER) { if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code; + } else { + pBlockCol->alg = defaultCmprAlg; } return 0; @@ -624,7 +628,7 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) { } } -void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) { +void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key) { key->ts = pBlock->aTSKEY[irow]; key->numOfPKs = 0; @@ -726,7 +730,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) { int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t code = 0; TSDBKEY key = TSDBROW_KEY(pRow); - SColVal *pColVal = &(SColVal){0}; + SColVal * pColVal = &(SColVal){0}; STColumn *pTColumn; int32_t iCol, jCol = 1; @@ -1058,8 +1062,8 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr SDelData *pDelData; int32_t code = 0; int32_t dataNum = eidx - sidx + 1; - SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); - SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); + SArray * aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); + SArray * pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); taosArrayClear(aSkyline); for (int32_t i = sidx; i <= eidx; ++i) { @@ -1421,7 +1425,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB SDiskDataHdr hdr = { .delimiter = TSDB_FILE_DLMT, - .fmtVer = 1, + .fmtVer = 2, .suid = bData->suid, .uid = bData->uid, .szUid = 0, // filled by compress key @@ -1474,7 +1478,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB .offset = offset, .alg = cinfo.cmprAlg}; - code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer); + code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg); TSDB_CHECK_CODE(code, lino, _exit); } hdr.szBlkCol = buffers[2].size; @@ -1513,7 +1517,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer * for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) { SBlockCol blockCol; - code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer); + code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg); + if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg; TSDB_CHECK_CODE(code, lino, _exit); code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist); TSDB_CHECK_CODE(code, lino, _exit); @@ -1536,17 +1541,17 @@ int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) { if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code; if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code; if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code; - if (pHdr->fmtVer < 1) { + if (pHdr->fmtVer < 2) { if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code; - } else if (pHdr->fmtVer == 1) { + } else if (pHdr->fmtVer == 2) { if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code; } else { // more data fmt ver } - if (pHdr->fmtVer == 1) { + if (pHdr->fmtVer >= 1) { if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code; for (int i = 0; i < pHdr->numOfPKs; i++) { - if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer))) return code; + if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) return code; } } @@ -1565,19 +1570,21 @@ int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) { if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code; if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code; if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code; - if (pHdr->fmtVer < 1) { + if (pHdr->fmtVer < 2) { int8_t cmprAlg = 0; if ((code = tBufferGetI8(br, &cmprAlg))) return code; pHdr->cmprAlg = cmprAlg; - } else if (pHdr->fmtVer == 1) { + } else if (pHdr->fmtVer == 2) { if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code; } else { // more data fmt ver } - if (pHdr->fmtVer == 1) { + if (pHdr->fmtVer >= 1) { if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code; for (int i = 0; i < pHdr->numOfPKs; i++) { - if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer))) return code; + if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) { + return code; + } } } else { pHdr->numOfPKs = 0; @@ -1655,7 +1662,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS); SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs]; - SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); + SColData * colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); if ((colData->cflag & COL_IS_KEY) == 0) { break;