fix invalid free

This commit is contained in:
yihaoDeng 2024-04-02 12:14:02 +08:00
parent 44e6d225e2
commit d85ae3c26e
4 changed files with 117 additions and 110 deletions

View File

@ -146,8 +146,8 @@ int32_t tTABLEIDCmprFn(const void *p1, const void *p2);
#define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2)) #define MIN_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) < 0) ? (KEY1) : (KEY2))
#define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2)) #define MAX_TSDBKEY(KEY1, KEY2) ((tsdbKeyCmprFn(&(KEY1), &(KEY2)) > 0) ? (KEY1) : (KEY2))
// SBlockCol // SBlockCol
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver); int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t cmprAlg);
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver); int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t cmprAlg);
int32_t tBlockColCmprFn(const void *p1, const void *p2); int32_t tBlockColCmprFn(const void *p1, const void *p2);
// SDataBlk // SDataBlk
void tDataBlkReset(SDataBlk *pBlock); void tDataBlkReset(SDataBlk *pBlock);
@ -225,10 +225,10 @@ void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray * tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter); int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter);
void *tsdbTbDataIterDestroy(STbDataIter *pIter); void * tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum); void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
@ -281,7 +281,7 @@ int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
typedef struct { typedef struct {
STsdb *tsdb; STsdb * tsdb;
int32_t fid; int32_t fid;
} SMergeArg; } SMergeArg;
@ -312,22 +312,22 @@ int32_t tsdbDataIterNext2(STsdbDataIter2 *pIter, STsdbFilterInfo *pFilterInfo);
// structs ======================= // structs =======================
struct STsdbFS { struct STsdbFS {
SDelFile *pDelFile; SDelFile *pDelFile;
SArray *aDFileSet; // SArray<SDFileSet> SArray * aDFileSet; // SArray<SDFileSet>
}; };
typedef struct { typedef struct {
rocksdb_t *db; rocksdb_t * db;
rocksdb_comparator_t *my_comparator; rocksdb_comparator_t * my_comparator;
rocksdb_cache_t *blockcache; rocksdb_cache_t * blockcache;
rocksdb_block_based_table_options_t *tableoptions; rocksdb_block_based_table_options_t *tableoptions;
rocksdb_options_t *options; rocksdb_options_t * options;
rocksdb_flushoptions_t *flushoptions; rocksdb_flushoptions_t * flushoptions;
rocksdb_writeoptions_t *writeoptions; rocksdb_writeoptions_t * writeoptions;
rocksdb_readoptions_t *readoptions; rocksdb_readoptions_t * readoptions;
rocksdb_writebatch_t *writebatch; rocksdb_writebatch_t * writebatch;
rocksdb_writebatch_t *rwritebatch; rocksdb_writebatch_t * rwritebatch;
TdThreadMutex rMutex; TdThreadMutex rMutex;
STSchema *pTSchema; STSchema * pTSchema;
} SRocksCache; } SRocksCache;
typedef struct { typedef struct {
@ -336,22 +336,22 @@ typedef struct {
} SCacheFlushState; } SCacheFlushState;
struct STsdb { struct STsdb {
char *path; char * path;
SVnode *pVnode; SVnode * pVnode;
STsdbKeepCfg keepCfg; STsdbKeepCfg keepCfg;
TdThreadMutex mutex; TdThreadMutex mutex;
bool bgTaskDisabled; bool bgTaskDisabled;
SMemTable *mem; SMemTable * mem;
SMemTable *imem; SMemTable * imem;
STsdbFS fs; // old STsdbFS fs; // old
SLRUCache *lruCache; SLRUCache * lruCache;
SCacheFlushState flushState; SCacheFlushState flushState;
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache *biCache; SLRUCache * biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;
SLRUCache *bCache; SLRUCache * bCache;
TdThreadMutex bMutex; TdThreadMutex bMutex;
SLRUCache *pgCache; SLRUCache * pgCache;
TdThreadMutex pgMutex; TdThreadMutex pgMutex;
struct STFileSystem *pFS; // new struct STFileSystem *pFS; // new
SRocksCache rCache; SRocksCache rCache;
@ -380,17 +380,17 @@ struct STbData {
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
SRWLatch lock; SRWLatch lock;
SDelData *pHead; SDelData * pHead;
SDelData *pTail; SDelData * pTail;
SMemSkipList sl; SMemSkipList sl;
STbData *next; STbData * next;
SRBTreeNode rbtn[1]; SRBTreeNode rbtn[1];
}; };
struct SMemTable { struct SMemTable {
SRWLatch latch; SRWLatch latch;
STsdb *pTsdb; STsdb * pTsdb;
SVBufPool *pPool; SVBufPool * pPool;
volatile int32_t nRef; volatile int32_t nRef;
int64_t minVer; int64_t minVer;
int64_t maxVer; int64_t maxVer;
@ -400,7 +400,7 @@ struct SMemTable {
int64_t nDel; int64_t nDel;
int32_t nTbData; int32_t nTbData;
int32_t nBucket; int32_t nBucket;
STbData **aBucket; STbData ** aBucket;
SRBTree tbDataTree[1]; SRBTree tbDataTree[1];
}; };
@ -409,7 +409,7 @@ struct TSDBROW {
union { union {
struct { struct {
int64_t version; int64_t version;
SRow *pTSRow; SRow * pTSRow;
}; };
struct { struct {
SBlockData *pBlockData; SBlockData *pBlockData;
@ -510,9 +510,9 @@ struct SBlockData {
int64_t suid; // 0 means normal table block data, otherwise child table block data int64_t suid; // 0 means normal table block data, otherwise child table block data
int64_t uid; // 0 means block data in .last file, otherwise in .data file int64_t uid; // 0 means block data in .last file, otherwise in .data file
int32_t nRow; // number of rows int32_t nRow; // number of rows
int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0) int64_t * aUid; // uids of each row, only exist in block data in .last file (uid == 0)
int64_t *aVersion; // versions of each row int64_t * aVersion; // versions of each row
TSKEY *aTSKEY; // timestamp of each row TSKEY * aTSKEY; // timestamp of each row
int32_t nColData; int32_t nColData;
SColData *aColData; SColData *aColData;
}; };
@ -523,10 +523,10 @@ struct TABLEID {
}; };
struct STbDataIter { struct STbDataIter {
STbData *pTbData; STbData * pTbData;
int8_t backward; int8_t backward;
SMemSkipListNode *pNode; SMemSkipListNode *pNode;
TSDBROW *pRow; TSDBROW * pRow;
TSDBROW row; TSDBROW row;
}; };
@ -604,9 +604,9 @@ struct SDFileSet {
int32_t fid; int32_t fid;
SHeadFile *pHeadF; SHeadFile *pHeadF;
SDataFile *pDataF; SDataFile *pDataF;
SSmaFile *pSmaF; SSmaFile * pSmaF;
uint8_t nSttF; uint8_t nSttF;
SSttFile *aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE]; SSttFile * aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE];
}; };
struct STSDBRowIter { struct STSDBRowIter {
@ -622,18 +622,18 @@ struct STSDBRowIter {
struct SRowMerger { struct SRowMerger {
STSchema *pTSchema; STSchema *pTSchema;
int64_t version; int64_t version;
SArray *pArray; // SArray<SColVal> SArray * pArray; // SArray<SColVal>
}; };
typedef struct { typedef struct {
char *path; char * path;
int32_t szPage; int32_t szPage;
int32_t flag; int32_t flag;
TdFilePtr pFD; TdFilePtr pFD;
int64_t pgno; int64_t pgno;
uint8_t *pBuf; uint8_t * pBuf;
int64_t szFile; int64_t szFile;
STsdb *pTsdb; STsdb * pTsdb;
const char *objName; const char *objName;
uint8_t s3File; uint8_t s3File;
int32_t fid; int32_t fid;
@ -642,7 +642,7 @@ typedef struct {
} STsdbFD; } STsdbFD;
struct SDelFWriter { struct SDelFWriter {
STsdb *pTsdb; STsdb * pTsdb;
SDelFile fDel; SDelFile fDel;
STsdbFD *pWriteH; STsdbFD *pWriteH;
uint8_t *aBuf[1]; uint8_t *aBuf[1];
@ -702,15 +702,15 @@ int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
// snap read // snap read
struct STsdbReadSnap { struct STsdbReadSnap {
SMemTable *pMem; SMemTable * pMem;
SQueryNode *pNode; SQueryNode * pNode;
SMemTable *pIMem; SMemTable * pIMem;
SQueryNode *pINode; SQueryNode * pINode;
TFileSetArray *pfSetArray; TFileSetArray *pfSetArray;
}; };
struct SDataFWriter { struct SDataFWriter {
STsdb *pTsdb; STsdb * pTsdb;
SDFileSet wSet; SDFileSet wSet;
STsdbFD *pHeadFD; STsdbFD *pHeadFD;
@ -727,13 +727,13 @@ struct SDataFWriter {
}; };
struct SDataFReader { struct SDataFReader {
STsdb *pTsdb; STsdb * pTsdb;
SDFileSet *pSet; SDFileSet *pSet;
STsdbFD *pHeadFD; STsdbFD * pHeadFD;
STsdbFD *pDataFD; STsdbFD * pDataFD;
STsdbFD *pSmaFD; STsdbFD * pSmaFD;
STsdbFD *aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE]; STsdbFD * aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE];
uint8_t *aBuf[3]; uint8_t * aBuf[3];
}; };
// NOTE: do NOT change the order of the fields // NOTE: do NOT change the order of the fields
@ -766,10 +766,10 @@ typedef struct {
typedef struct SSttBlockLoadInfo { typedef struct SSttBlockLoadInfo {
SBlockDataInfo blockData[2]; // buffered block data SBlockDataInfo blockData[2]; // buffered block data
SArray *aSttBlk; SArray * aSttBlk;
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
STSchema *pSchema; STSchema * pSchema;
int16_t *colIds; int16_t * colIds;
int32_t numOfCols; int32_t numOfCols;
bool checkRemainingRow; // todo: no assign value? bool checkRemainingRow; // todo: no assign value?
bool isLast; bool isLast;
@ -806,7 +806,7 @@ struct SDiskData {
const uint8_t *pUid; const uint8_t *pUid;
const uint8_t *pVer; const uint8_t *pVer;
const uint8_t *pKey; const uint8_t *pKey;
SArray *aDiskCol; // SArray<SDiskCol> SArray * aDiskCol; // SArray<SDiskCol>
}; };
struct SDiskDataBuilder { struct SDiskDataBuilder {
@ -819,15 +819,15 @@ struct SDiskDataBuilder {
SCompressor *pVerC; SCompressor *pVerC;
SCompressor *pKeyC; SCompressor *pKeyC;
int32_t nBuilder; int32_t nBuilder;
SArray *aBuilder; // SArray<SDiskColBuilder> SArray * aBuilder; // SArray<SDiskColBuilder>
uint8_t *aBuf[2]; uint8_t * aBuf[2];
SDiskData dd; SDiskData dd;
SBlkInfo bi; SBlkInfo bi;
}; };
struct SLDataIter { struct SLDataIter {
SRBTreeNode node; SRBTreeNode node;
SSttBlk *pSttBlk; SSttBlk * pSttBlk;
int64_t cid; // for debug purpose int64_t cid; // for debug purpose
int8_t backward; int8_t backward;
int32_t iSttBlk; int32_t iSttBlk;
@ -836,8 +836,8 @@ struct SLDataIter {
uint64_t uid; uint64_t uid;
STimeWindow timeWindow; STimeWindow timeWindow;
SVersionRange verRange; SVersionRange verRange;
SSttBlockLoadInfo *pBlockLoadInfo; SSttBlockLoadInfo * pBlockLoadInfo;
SRowKey startRowKey; // current row key SRowKey startRowKey; // current row key
__compar_fn_t comparFn; __compar_fn_t comparFn;
bool ignoreEarlierTs; bool ignoreEarlierTs;
struct SSttFileReader *pReader; struct SSttFileReader *pReader;
@ -851,22 +851,22 @@ typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pS
typedef struct SMergeTreeConf { typedef struct SMergeTreeConf {
int8_t backward; int8_t backward;
STsdb *pTsdb; STsdb * pTsdb;
uint64_t suid; uint64_t suid;
uint64_t uid; uint64_t uid;
STimeWindow timewindow; STimeWindow timewindow;
SVersionRange verRange; SVersionRange verRange;
bool strictTimeRange; bool strictTimeRange;
SArray *pSttFileBlockIterArray; SArray * pSttFileBlockIterArray;
void *pCurrentFileset; void * pCurrentFileset;
STSchema *pSchema; STSchema * pSchema;
int16_t *pCols; int16_t * pCols;
int32_t numOfCols; int32_t numOfCols;
SRowKey *pCurRowKey; SRowKey * pCurRowKey;
_load_tomb_fn loadTombFn; _load_tomb_fn loadTombFn;
__compar_fn_t comparFn; __compar_fn_t comparFn;
void *pReader; void * pReader;
void *idstr; void * idstr;
bool rspRows; // response the rows in stt-file, if possible bool rspRows; // response the rows in stt-file, if possible
} SMergeTreeConf; } SMergeTreeConf;
@ -884,8 +884,8 @@ bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void * destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); void * destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef enum { typedef enum {
@ -967,7 +967,7 @@ struct STsdbDataIter2 {
// TSDB_DATA_FILE_DATA_ITER // TSDB_DATA_FILE_DATA_ITER
struct { struct {
SDataFReader *pReader; SDataFReader *pReader;
SArray *aBlockIdx; // SArray<SBlockIdx> SArray * aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk; SMapData mDataBlk;
SBlockData bData; SBlockData bData;
int32_t iBlockIdx; int32_t iBlockIdx;
@ -979,7 +979,7 @@ struct STsdbDataIter2 {
struct { struct {
SDataFReader *pReader; SDataFReader *pReader;
int32_t iStt; int32_t iStt;
SArray *aSttBlk; SArray * aSttBlk;
SBlockData bData; SBlockData bData;
int32_t iSttBlk; int32_t iSttBlk;
int32_t iRow; int32_t iRow;
@ -987,8 +987,8 @@ struct STsdbDataIter2 {
// TSDB_TOMB_FILE_DATA_ITER // TSDB_TOMB_FILE_DATA_ITER
struct { struct {
SDelFReader *pReader; SDelFReader *pReader;
SArray *aDelIdx; SArray * aDelIdx;
SArray *aDelData; SArray * aDelData;
int32_t iDelIdx; int32_t iDelIdx;
int32_t iDelData; int32_t iDelData;
} tIter; } tIter;

View File

@ -301,9 +301,9 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
int32_t lino = 0; int32_t lino = 0;
SDiskDataHdr hdr; SDiskDataHdr hdr;
SBuffer *buffer0 = reader->buffers + 0; SBuffer * buffer0 = reader->buffers + 0;
SBuffer *buffer1 = reader->buffers + 1; SBuffer * buffer1 = reader->buffers + 1;
SBuffer *assist = reader->buffers + 2; SBuffer * assist = reader->buffers + 2;
// load key part // load key part
tBufferClear(buffer0); tBufferClear(buffer0);
@ -363,7 +363,7 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
break; break;
} }
code = tGetBlockCol(&br, &blockCol, hdr.fmtVer); code = tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }

View File

@ -20,7 +20,7 @@
// SSttFReader ============================================================ // SSttFReader ============================================================
struct SSttFileReader { struct SSttFileReader {
SSttFileReaderConfig config[1]; SSttFileReaderConfig config[1];
STsdbFD *fd; STsdbFD * fd;
SSttFooter footer[1]; SSttFooter footer[1];
struct { struct {
bool sttBlkLoaded; bool sttBlkLoaded;
@ -31,7 +31,7 @@ struct SSttFileReader {
TStatisBlkArray statisBlkArray[1]; TStatisBlkArray statisBlkArray[1];
TTombBlkArray tombBlkArray[1]; TTombBlkArray tombBlkArray[1];
SBuffer local[10]; SBuffer local[10];
SBuffer *buffers; SBuffer * buffers;
}; };
// SSttFileReader // SSttFileReader
@ -96,7 +96,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray *
ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0); ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk); int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk);
void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); void * data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY; if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = int32_t code =
@ -124,7 +124,7 @@ int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tom
ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0); ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk); int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
void *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size); void * data = taosMemoryMalloc(reader->footer->tombBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY; if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = int32_t code =
@ -152,7 +152,7 @@ int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBl
ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0); ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk); int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size); void * data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY; if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = int32_t code =
@ -203,9 +203,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
int32_t lino = 0; int32_t lino = 0;
SDiskDataHdr hdr; SDiskDataHdr hdr;
SBuffer *buffer0 = reader->buffers + 0; SBuffer * buffer0 = reader->buffers + 0;
SBuffer *buffer1 = reader->buffers + 1; SBuffer * buffer1 = reader->buffers + 1;
SBuffer *assist = reader->buffers + 2; SBuffer * assist = reader->buffers + 2;
// load key part // load key part
tBufferClear(buffer0); tBufferClear(buffer0);
@ -265,7 +265,7 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
break; break;
} }
code = tGetBlockCol(&br, &blockCol, hdr.fmtVer); code = tGetBlockCol(&br, &blockCol, hdr.fmtVer, hdr.cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }

View File

@ -247,7 +247,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
int32_t tCmprBlockL(void const *lhs, void const *rhs) { int32_t tCmprBlockL(void const *lhs, void const *rhs) {
SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
SSttBlk *rBlockL = (SSttBlk *)rhs; SSttBlk * rBlockL = (SSttBlk *)rhs;
if (lBlockIdx->suid < rBlockL->suid) { if (lBlockIdx->suid < rBlockL->suid) {
return -1; return -1;
@ -388,9 +388,9 @@ int32_t tGetSttBlk(uint8_t *p, void *ph) {
// SBlockCol ====================================================== // SBlockCol ======================================================
static const int32_t BLOCK_WITH_ALG_VER = 1; static const int32_t BLOCK_WITH_ALG_VER = 2;
int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver) { int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
int32_t code; int32_t code;
ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE)); ASSERT(pBlockCol->flag && (pBlockCol->flag != HAS_NONE));
@ -418,11 +418,13 @@ int32_t tPutBlockCol(SBuffer *buffer, const SBlockCol *pBlockCol, int32_t ver) {
} }
if (ver >= BLOCK_WITH_ALG_VER) { if (ver >= BLOCK_WITH_ALG_VER) {
if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code; if ((code = tBufferPutU32(buffer, pBlockCol->alg))) return code;
} else {
if ((code = tBufferPutU32(buffer, defaultCmprAlg))) return code;
} }
return 0; return 0;
} }
int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver) { int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver, uint32_t defaultCmprAlg) {
int32_t code; int32_t code;
if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code; if ((code = tBufferGetI16v(br, &pBlockCol->cid))) return code;
@ -456,6 +458,8 @@ int32_t tGetBlockCol(SBufferReader *br, SBlockCol *pBlockCol, int32_t ver) {
if (ver >= BLOCK_WITH_ALG_VER) { if (ver >= BLOCK_WITH_ALG_VER) {
if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code; if ((code = tBufferGetU32(br, &pBlockCol->alg))) return code;
} else {
pBlockCol->alg = defaultCmprAlg;
} }
return 0; return 0;
@ -624,7 +628,7 @@ void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key) {
} }
} }
void tColRowGetKey(SBlockData* pBlock, int32_t irow, SRowKey* key) { void tColRowGetKey(SBlockData *pBlock, int32_t irow, SRowKey *key) {
key->ts = pBlock->aTSKEY[irow]; key->ts = pBlock->aTSKEY[irow];
key->numOfPKs = 0; key->numOfPKs = 0;
@ -726,7 +730,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal *pColVal = &(SColVal){0}; SColVal * pColVal = &(SColVal){0};
STColumn *pTColumn; STColumn *pTColumn;
int32_t iCol, jCol = 1; int32_t iCol, jCol = 1;
@ -1058,8 +1062,8 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
SDelData *pDelData; SDelData *pDelData;
int32_t code = 0; int32_t code = 0;
int32_t dataNum = eidx - sidx + 1; int32_t dataNum = eidx - sidx + 1;
SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); SArray * aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); SArray * pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
taosArrayClear(aSkyline); taosArrayClear(aSkyline);
for (int32_t i = sidx; i <= eidx; ++i) { for (int32_t i = sidx; i <= eidx; ++i) {
@ -1421,7 +1425,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB
SDiskDataHdr hdr = { SDiskDataHdr hdr = {
.delimiter = TSDB_FILE_DLMT, .delimiter = TSDB_FILE_DLMT,
.fmtVer = 1, .fmtVer = 2,
.suid = bData->suid, .suid = bData->suid,
.uid = bData->uid, .uid = bData->uid,
.szUid = 0, // filled by compress key .szUid = 0, // filled by compress key
@ -1474,7 +1478,7 @@ int32_t tBlockDataCompress(SBlockData *bData, void *pCompr, SBuffer *buffers, SB
.offset = offset, .offset = offset,
.alg = cinfo.cmprAlg}; .alg = cinfo.cmprAlg};
code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer); code = tPutBlockCol(&buffers[2], &blockCol, hdr.fmtVer, hdr.cmprAlg);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
} }
hdr.szBlkCol = buffers[2].size; hdr.szBlkCol = buffers[2].size;
@ -1513,7 +1517,8 @@ int32_t tBlockDataDecompress(SBufferReader *br, SBlockData *blockData, SBuffer *
for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) { for (uint32_t startOffset = br2.offset; br2.offset - startOffset < hdr.szBlkCol;) {
SBlockCol blockCol; SBlockCol blockCol;
code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer); code = tGetBlockCol(&br2, &blockCol, hdr.fmtVer, hdr.cmprAlg);
if (blockCol.alg == 0) blockCol.alg = hdr.cmprAlg;
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist); code = tBlockDataDecompressColData(&hdr, &blockCol, br, blockData, assist);
TSDB_CHECK_CODE(code, lino, _exit); TSDB_CHECK_CODE(code, lino, _exit);
@ -1536,17 +1541,17 @@ int32_t tPutDiskDataHdr(SBuffer *buffer, const SDiskDataHdr *pHdr) {
if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code; if ((code = tBufferPutI32v(buffer, pHdr->szKey))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code; if ((code = tBufferPutI32v(buffer, pHdr->szBlkCol))) return code;
if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code; if ((code = tBufferPutI32v(buffer, pHdr->nRow))) return code;
if (pHdr->fmtVer < 1) { if (pHdr->fmtVer < 2) {
if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code; if ((code = tBufferPutI8(buffer, pHdr->cmprAlg))) return code;
} else if (pHdr->fmtVer == 1) { } else if (pHdr->fmtVer == 2) {
if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code; if ((code = tBufferPutU32(buffer, pHdr->cmprAlg))) return code;
} else { } else {
// more data fmt ver // more data fmt ver
} }
if (pHdr->fmtVer == 1) { if (pHdr->fmtVer >= 1) {
if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code; if ((code = tBufferPutI8(buffer, pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) { for (int i = 0; i < pHdr->numOfPKs; i++) {
if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer))) return code; if ((code = tPutBlockCol(buffer, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) return code;
} }
} }
@ -1565,19 +1570,21 @@ int32_t tGetDiskDataHdr(SBufferReader *br, SDiskDataHdr *pHdr) {
if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code; if ((code = tBufferGetI32v(br, &pHdr->szKey))) return code;
if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code; if ((code = tBufferGetI32v(br, &pHdr->szBlkCol))) return code;
if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code; if ((code = tBufferGetI32v(br, &pHdr->nRow))) return code;
if (pHdr->fmtVer < 1) { if (pHdr->fmtVer < 2) {
int8_t cmprAlg = 0; int8_t cmprAlg = 0;
if ((code = tBufferGetI8(br, &cmprAlg))) return code; if ((code = tBufferGetI8(br, &cmprAlg))) return code;
pHdr->cmprAlg = cmprAlg; pHdr->cmprAlg = cmprAlg;
} else if (pHdr->fmtVer == 1) { } else if (pHdr->fmtVer == 2) {
if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code; if ((code = tBufferGetU32(br, &pHdr->cmprAlg))) return code;
} else { } else {
// more data fmt ver // more data fmt ver
} }
if (pHdr->fmtVer == 1) { if (pHdr->fmtVer >= 1) {
if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code; if ((code = tBufferGetI8(br, &pHdr->numOfPKs))) return code;
for (int i = 0; i < pHdr->numOfPKs; i++) { for (int i = 0; i < pHdr->numOfPKs; i++) {
if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer))) return code; if ((code = tGetBlockCol(br, &pHdr->primaryBlockCols[i], pHdr->fmtVer, pHdr->cmprAlg))) {
return code;
}
} }
} else { } else {
pHdr->numOfPKs = 0; pHdr->numOfPKs = 0;
@ -1655,7 +1662,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS); ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS);
SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs]; SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs];
SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); SColData * colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs);
if ((colData->cflag & COL_IS_KEY) == 0) { if ((colData->cflag & COL_IS_KEY) == 0) {
break; break;