Merge branch '3.0' into feature/compressData

This commit is contained in:
yihaoDeng 2024-04-02 04:20:39 +00:00
parent d85ae3c26e
commit 684055bc7c
4 changed files with 90 additions and 90 deletions

View File

@ -225,10 +225,10 @@ void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive);
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid); STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode); int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode);
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive); int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
SArray * tsdbMemTableGetTbDataArray(SMemTable *pMemTable); SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
// STbDataIter // STbDataIter
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter); int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter);
void * tsdbTbDataIterDestroy(STbDataIter *pIter); void *tsdbTbDataIterDestroy(STbDataIter *pIter);
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter); void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
bool tsdbTbDataIterNext(STbDataIter *pIter); bool tsdbTbDataIterNext(STbDataIter *pIter);
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum); void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
@ -281,7 +281,7 @@ int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_
// tsdbMerge.c ============================================================================================== // tsdbMerge.c ==============================================================================================
typedef struct { typedef struct {
STsdb * tsdb; STsdb *tsdb;
int32_t fid; int32_t fid;
} SMergeArg; } SMergeArg;
@ -312,22 +312,22 @@ int32_t tsdbDataIterNext2(STsdbDataIter2 *pIter, STsdbFilterInfo *pFilterInfo);
// structs ======================= // structs =======================
struct STsdbFS { struct STsdbFS {
SDelFile *pDelFile; SDelFile *pDelFile;
SArray * aDFileSet; // SArray<SDFileSet> SArray *aDFileSet; // SArray<SDFileSet>
}; };
typedef struct { typedef struct {
rocksdb_t * db; rocksdb_t *db;
rocksdb_comparator_t * my_comparator; rocksdb_comparator_t *my_comparator;
rocksdb_cache_t * blockcache; rocksdb_cache_t *blockcache;
rocksdb_block_based_table_options_t *tableoptions; rocksdb_block_based_table_options_t *tableoptions;
rocksdb_options_t * options; rocksdb_options_t *options;
rocksdb_flushoptions_t * flushoptions; rocksdb_flushoptions_t *flushoptions;
rocksdb_writeoptions_t * writeoptions; rocksdb_writeoptions_t *writeoptions;
rocksdb_readoptions_t * readoptions; rocksdb_readoptions_t *readoptions;
rocksdb_writebatch_t * writebatch; rocksdb_writebatch_t *writebatch;
rocksdb_writebatch_t * rwritebatch; rocksdb_writebatch_t *rwritebatch;
TdThreadMutex rMutex; TdThreadMutex rMutex;
STSchema * pTSchema; STSchema *pTSchema;
} SRocksCache; } SRocksCache;
typedef struct { typedef struct {
@ -336,22 +336,22 @@ typedef struct {
} SCacheFlushState; } SCacheFlushState;
struct STsdb { struct STsdb {
char * path; char *path;
SVnode * pVnode; SVnode *pVnode;
STsdbKeepCfg keepCfg; STsdbKeepCfg keepCfg;
TdThreadMutex mutex; TdThreadMutex mutex;
bool bgTaskDisabled; bool bgTaskDisabled;
SMemTable * mem; SMemTable *mem;
SMemTable * imem; SMemTable *imem;
STsdbFS fs; // old STsdbFS fs; // old
SLRUCache * lruCache; SLRUCache *lruCache;
SCacheFlushState flushState; SCacheFlushState flushState;
TdThreadMutex lruMutex; TdThreadMutex lruMutex;
SLRUCache * biCache; SLRUCache *biCache;
TdThreadMutex biMutex; TdThreadMutex biMutex;
SLRUCache * bCache; SLRUCache *bCache;
TdThreadMutex bMutex; TdThreadMutex bMutex;
SLRUCache * pgCache; SLRUCache *pgCache;
TdThreadMutex pgMutex; TdThreadMutex pgMutex;
struct STFileSystem *pFS; // new struct STFileSystem *pFS; // new
SRocksCache rCache; SRocksCache rCache;
@ -380,17 +380,17 @@ struct STbData {
TSKEY minKey; TSKEY minKey;
TSKEY maxKey; TSKEY maxKey;
SRWLatch lock; SRWLatch lock;
SDelData * pHead; SDelData *pHead;
SDelData * pTail; SDelData *pTail;
SMemSkipList sl; SMemSkipList sl;
STbData * next; STbData *next;
SRBTreeNode rbtn[1]; SRBTreeNode rbtn[1];
}; };
struct SMemTable { struct SMemTable {
SRWLatch latch; SRWLatch latch;
STsdb * pTsdb; STsdb *pTsdb;
SVBufPool * pPool; SVBufPool *pPool;
volatile int32_t nRef; volatile int32_t nRef;
int64_t minVer; int64_t minVer;
int64_t maxVer; int64_t maxVer;
@ -400,7 +400,7 @@ struct SMemTable {
int64_t nDel; int64_t nDel;
int32_t nTbData; int32_t nTbData;
int32_t nBucket; int32_t nBucket;
STbData ** aBucket; STbData **aBucket;
SRBTree tbDataTree[1]; SRBTree tbDataTree[1];
}; };
@ -409,7 +409,7 @@ struct TSDBROW {
union { union {
struct { struct {
int64_t version; int64_t version;
SRow * pTSRow; SRow *pTSRow;
}; };
struct { struct {
SBlockData *pBlockData; SBlockData *pBlockData;
@ -510,9 +510,9 @@ struct SBlockData {
int64_t suid; // 0 means normal table block data, otherwise child table block data int64_t suid; // 0 means normal table block data, otherwise child table block data
int64_t uid; // 0 means block data in .last file, otherwise in .data file int64_t uid; // 0 means block data in .last file, otherwise in .data file
int32_t nRow; // number of rows int32_t nRow; // number of rows
int64_t * aUid; // uids of each row, only exist in block data in .last file (uid == 0) int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0)
int64_t * aVersion; // versions of each row int64_t *aVersion; // versions of each row
TSKEY * aTSKEY; // timestamp of each row TSKEY *aTSKEY; // timestamp of each row
int32_t nColData; int32_t nColData;
SColData *aColData; SColData *aColData;
}; };
@ -523,10 +523,10 @@ struct TABLEID {
}; };
struct STbDataIter { struct STbDataIter {
STbData * pTbData; STbData *pTbData;
int8_t backward; int8_t backward;
SMemSkipListNode *pNode; SMemSkipListNode *pNode;
TSDBROW * pRow; TSDBROW *pRow;
TSDBROW row; TSDBROW row;
}; };
@ -604,9 +604,9 @@ struct SDFileSet {
int32_t fid; int32_t fid;
SHeadFile *pHeadF; SHeadFile *pHeadF;
SDataFile *pDataF; SDataFile *pDataF;
SSmaFile * pSmaF; SSmaFile *pSmaF;
uint8_t nSttF; uint8_t nSttF;
SSttFile * aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE]; SSttFile *aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE];
}; };
struct STSDBRowIter { struct STSDBRowIter {
@ -622,18 +622,18 @@ struct STSDBRowIter {
struct SRowMerger { struct SRowMerger {
STSchema *pTSchema; STSchema *pTSchema;
int64_t version; int64_t version;
SArray * pArray; // SArray<SColVal> SArray *pArray; // SArray<SColVal>
}; };
typedef struct { typedef struct {
char * path; char *path;
int32_t szPage; int32_t szPage;
int32_t flag; int32_t flag;
TdFilePtr pFD; TdFilePtr pFD;
int64_t pgno; int64_t pgno;
uint8_t * pBuf; uint8_t *pBuf;
int64_t szFile; int64_t szFile;
STsdb * pTsdb; STsdb *pTsdb;
const char *objName; const char *objName;
uint8_t s3File; uint8_t s3File;
int32_t fid; int32_t fid;
@ -642,7 +642,7 @@ typedef struct {
} STsdbFD; } STsdbFD;
struct SDelFWriter { struct SDelFWriter {
STsdb * pTsdb; STsdb *pTsdb;
SDelFile fDel; SDelFile fDel;
STsdbFD *pWriteH; STsdbFD *pWriteH;
uint8_t *aBuf[1]; uint8_t *aBuf[1];
@ -702,15 +702,15 @@ int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
// snap read // snap read
struct STsdbReadSnap { struct STsdbReadSnap {
SMemTable * pMem; SMemTable *pMem;
SQueryNode * pNode; SQueryNode *pNode;
SMemTable * pIMem; SMemTable *pIMem;
SQueryNode * pINode; SQueryNode *pINode;
TFileSetArray *pfSetArray; TFileSetArray *pfSetArray;
}; };
struct SDataFWriter { struct SDataFWriter {
STsdb * pTsdb; STsdb *pTsdb;
SDFileSet wSet; SDFileSet wSet;
STsdbFD *pHeadFD; STsdbFD *pHeadFD;
@ -727,13 +727,13 @@ struct SDataFWriter {
}; };
struct SDataFReader { struct SDataFReader {
STsdb * pTsdb; STsdb *pTsdb;
SDFileSet *pSet; SDFileSet *pSet;
STsdbFD * pHeadFD; STsdbFD *pHeadFD;
STsdbFD * pDataFD; STsdbFD *pDataFD;
STsdbFD * pSmaFD; STsdbFD *pSmaFD;
STsdbFD * aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE]; STsdbFD *aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE];
uint8_t * aBuf[3]; uint8_t *aBuf[3];
}; };
// NOTE: do NOT change the order of the fields // NOTE: do NOT change the order of the fields
@ -766,10 +766,10 @@ typedef struct {
typedef struct SSttBlockLoadInfo { typedef struct SSttBlockLoadInfo {
SBlockDataInfo blockData[2]; // buffered block data SBlockDataInfo blockData[2]; // buffered block data
SArray * aSttBlk; SArray *aSttBlk;
int32_t currentLoadBlockIndex; int32_t currentLoadBlockIndex;
STSchema * pSchema; STSchema *pSchema;
int16_t * colIds; int16_t *colIds;
int32_t numOfCols; int32_t numOfCols;
bool checkRemainingRow; // todo: no assign value? bool checkRemainingRow; // todo: no assign value?
bool isLast; bool isLast;
@ -806,7 +806,7 @@ struct SDiskData {
const uint8_t *pUid; const uint8_t *pUid;
const uint8_t *pVer; const uint8_t *pVer;
const uint8_t *pKey; const uint8_t *pKey;
SArray * aDiskCol; // SArray<SDiskCol> SArray *aDiskCol; // SArray<SDiskCol>
}; };
struct SDiskDataBuilder { struct SDiskDataBuilder {
@ -819,15 +819,15 @@ struct SDiskDataBuilder {
SCompressor *pVerC; SCompressor *pVerC;
SCompressor *pKeyC; SCompressor *pKeyC;
int32_t nBuilder; int32_t nBuilder;
SArray * aBuilder; // SArray<SDiskColBuilder> SArray *aBuilder; // SArray<SDiskColBuilder>
uint8_t * aBuf[2]; uint8_t *aBuf[2];
SDiskData dd; SDiskData dd;
SBlkInfo bi; SBlkInfo bi;
}; };
struct SLDataIter { struct SLDataIter {
SRBTreeNode node; SRBTreeNode node;
SSttBlk * pSttBlk; SSttBlk *pSttBlk;
int64_t cid; // for debug purpose int64_t cid; // for debug purpose
int8_t backward; int8_t backward;
int32_t iSttBlk; int32_t iSttBlk;
@ -836,7 +836,7 @@ struct SLDataIter {
uint64_t uid; uint64_t uid;
STimeWindow timeWindow; STimeWindow timeWindow;
SVersionRange verRange; SVersionRange verRange;
SSttBlockLoadInfo * pBlockLoadInfo; SSttBlockLoadInfo *pBlockLoadInfo;
SRowKey startRowKey; // current row key SRowKey startRowKey; // current row key
__compar_fn_t comparFn; __compar_fn_t comparFn;
bool ignoreEarlierTs; bool ignoreEarlierTs;
@ -851,22 +851,22 @@ typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pS
typedef struct SMergeTreeConf { typedef struct SMergeTreeConf {
int8_t backward; int8_t backward;
STsdb * pTsdb; STsdb *pTsdb;
uint64_t suid; uint64_t suid;
uint64_t uid; uint64_t uid;
STimeWindow timewindow; STimeWindow timewindow;
SVersionRange verRange; SVersionRange verRange;
bool strictTimeRange; bool strictTimeRange;
SArray * pSttFileBlockIterArray; SArray *pSttFileBlockIterArray;
void * pCurrentFileset; void *pCurrentFileset;
STSchema * pSchema; STSchema *pSchema;
int16_t * pCols; int16_t *pCols;
int32_t numOfCols; int32_t numOfCols;
SRowKey * pCurRowKey; SRowKey *pCurRowKey;
_load_tomb_fn loadTombFn; _load_tomb_fn loadTombFn;
__compar_fn_t comparFn; __compar_fn_t comparFn;
void * pReader; void *pReader;
void * idstr; void *idstr;
bool rspRows; // response the rows in stt-file, if possible bool rspRows; // response the rows in stt-file, if possible
} SMergeTreeConf; } SMergeTreeConf;
@ -884,8 +884,8 @@ bool tMergeTreeIgnoreEarlierTs(SMergeTree *pMTree);
void tMergeTreeClose(SMergeTree *pMTree); void tMergeTreeClose(SMergeTree *pMTree);
SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols); SSttBlockLoadInfo *tCreateSttBlockLoadInfo(STSchema *pSchema, int16_t *colList, int32_t numOfCols);
void * destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo); void *destroySttBlockLoadInfo(SSttBlockLoadInfo *pLoadInfo);
void * destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost); void *destroySttBlockReader(SArray *pLDataIterArray, SSttBlockLoadCostInfo *pLoadCost);
// tsdbCache ============================================================================================== // tsdbCache ==============================================================================================
typedef enum { typedef enum {
@ -967,7 +967,7 @@ struct STsdbDataIter2 {
// TSDB_DATA_FILE_DATA_ITER // TSDB_DATA_FILE_DATA_ITER
struct { struct {
SDataFReader *pReader; SDataFReader *pReader;
SArray * aBlockIdx; // SArray<SBlockIdx> SArray *aBlockIdx; // SArray<SBlockIdx>
SMapData mDataBlk; SMapData mDataBlk;
SBlockData bData; SBlockData bData;
int32_t iBlockIdx; int32_t iBlockIdx;
@ -979,7 +979,7 @@ struct STsdbDataIter2 {
struct { struct {
SDataFReader *pReader; SDataFReader *pReader;
int32_t iStt; int32_t iStt;
SArray * aSttBlk; SArray *aSttBlk;
SBlockData bData; SBlockData bData;
int32_t iSttBlk; int32_t iSttBlk;
int32_t iRow; int32_t iRow;
@ -987,8 +987,8 @@ struct STsdbDataIter2 {
// TSDB_TOMB_FILE_DATA_ITER // TSDB_TOMB_FILE_DATA_ITER
struct { struct {
SDelFReader *pReader; SDelFReader *pReader;
SArray * aDelIdx; SArray *aDelIdx;
SArray * aDelData; SArray *aDelData;
int32_t iDelIdx; int32_t iDelIdx;
int32_t iDelData; int32_t iDelData;
} tIter; } tIter;

View File

@ -301,9 +301,9 @@ int32_t tsdbDataFileReadBlockDataByColumn(SDataFileReader *reader, const SBrinRe
int32_t lino = 0; int32_t lino = 0;
SDiskDataHdr hdr; SDiskDataHdr hdr;
SBuffer * buffer0 = reader->buffers + 0; SBuffer *buffer0 = reader->buffers + 0;
SBuffer * buffer1 = reader->buffers + 1; SBuffer *buffer1 = reader->buffers + 1;
SBuffer * assist = reader->buffers + 2; SBuffer *assist = reader->buffers + 2;
// load key part // load key part
tBufferClear(buffer0); tBufferClear(buffer0);

View File

@ -20,7 +20,7 @@
// SSttFReader ============================================================ // SSttFReader ============================================================
struct SSttFileReader { struct SSttFileReader {
SSttFileReaderConfig config[1]; SSttFileReaderConfig config[1];
STsdbFD * fd; STsdbFD *fd;
SSttFooter footer[1]; SSttFooter footer[1];
struct { struct {
bool sttBlkLoaded; bool sttBlkLoaded;
@ -31,7 +31,7 @@ struct SSttFileReader {
TStatisBlkArray statisBlkArray[1]; TStatisBlkArray statisBlkArray[1];
TTombBlkArray tombBlkArray[1]; TTombBlkArray tombBlkArray[1];
SBuffer local[10]; SBuffer local[10];
SBuffer * buffers; SBuffer *buffers;
}; };
// SSttFileReader // SSttFileReader
@ -96,7 +96,7 @@ int32_t tsdbSttFileReadStatisBlk(SSttFileReader *reader, const TStatisBlkArray *
ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0); ASSERT(reader->footer->statisBlkPtr->size % sizeof(SStatisBlk) == 0);
int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk); int32_t size = reader->footer->statisBlkPtr->size / sizeof(SStatisBlk);
void * data = taosMemoryMalloc(reader->footer->statisBlkPtr->size); void *data = taosMemoryMalloc(reader->footer->statisBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY; if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = int32_t code =
@ -124,7 +124,7 @@ int32_t tsdbSttFileReadTombBlk(SSttFileReader *reader, const TTombBlkArray **tom
ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0); ASSERT(reader->footer->tombBlkPtr->size % sizeof(STombBlk) == 0);
int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk); int32_t size = reader->footer->tombBlkPtr->size / sizeof(STombBlk);
void * data = taosMemoryMalloc(reader->footer->tombBlkPtr->size); void *data = taosMemoryMalloc(reader->footer->tombBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY; if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = int32_t code =
@ -152,7 +152,7 @@ int32_t tsdbSttFileReadSttBlk(SSttFileReader *reader, const TSttBlkArray **sttBl
ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0); ASSERT(reader->footer->sttBlkPtr->size % sizeof(SSttBlk) == 0);
int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk); int32_t size = reader->footer->sttBlkPtr->size / sizeof(SSttBlk);
void * data = taosMemoryMalloc(reader->footer->sttBlkPtr->size); void *data = taosMemoryMalloc(reader->footer->sttBlkPtr->size);
if (!data) return TSDB_CODE_OUT_OF_MEMORY; if (!data) return TSDB_CODE_OUT_OF_MEMORY;
int32_t code = int32_t code =
@ -203,9 +203,9 @@ int32_t tsdbSttFileReadBlockDataByColumn(SSttFileReader *reader, const SSttBlk *
int32_t lino = 0; int32_t lino = 0;
SDiskDataHdr hdr; SDiskDataHdr hdr;
SBuffer * buffer0 = reader->buffers + 0; SBuffer *buffer0 = reader->buffers + 0;
SBuffer * buffer1 = reader->buffers + 1; SBuffer *buffer1 = reader->buffers + 1;
SBuffer * assist = reader->buffers + 2; SBuffer *assist = reader->buffers + 2;
// load key part // load key part
tBufferClear(buffer0); tBufferClear(buffer0);

View File

@ -247,7 +247,7 @@ int32_t tCmprBlockIdx(void const *lhs, void const *rhs) {
int32_t tCmprBlockL(void const *lhs, void const *rhs) { int32_t tCmprBlockL(void const *lhs, void const *rhs) {
SBlockIdx *lBlockIdx = (SBlockIdx *)lhs; SBlockIdx *lBlockIdx = (SBlockIdx *)lhs;
SSttBlk * rBlockL = (SSttBlk *)rhs; SSttBlk *rBlockL = (SSttBlk *)rhs;
if (lBlockIdx->suid < rBlockL->suid) { if (lBlockIdx->suid < rBlockL->suid) {
return -1; return -1;
@ -730,7 +730,7 @@ SColVal *tsdbRowIterNext(STSDBRowIter *pIter) {
int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) { int32_t tsdbRowMergerAdd(SRowMerger *pMerger, TSDBROW *pRow, STSchema *pTSchema) {
int32_t code = 0; int32_t code = 0;
TSDBKEY key = TSDBROW_KEY(pRow); TSDBKEY key = TSDBROW_KEY(pRow);
SColVal * pColVal = &(SColVal){0}; SColVal *pColVal = &(SColVal){0};
STColumn *pTColumn; STColumn *pTColumn;
int32_t iCol, jCol = 1; int32_t iCol, jCol = 1;
@ -1062,8 +1062,8 @@ int32_t tsdbBuildDeleteSkyline(SArray *aDelData, int32_t sidx, int32_t eidx, SAr
SDelData *pDelData; SDelData *pDelData;
int32_t code = 0; int32_t code = 0;
int32_t dataNum = eidx - sidx + 1; int32_t dataNum = eidx - sidx + 1;
SArray * aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY)); SArray *aTmpSkyline = taosArrayInit(dataNum * 2, sizeof(TSDBKEY));
SArray * pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES); SArray *pSkyline = taosArrayInit(dataNum * 2, POINTER_BYTES);
taosArrayClear(aSkyline); taosArrayClear(aSkyline);
for (int32_t i = sidx; i <= eidx; ++i) { for (int32_t i = sidx; i <= eidx; ++i) {
@ -1662,7 +1662,7 @@ static int32_t tBlockDataCompressKeyPart(SBlockData *bData, SDiskDataHdr *hdr, S
ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS); ASSERT(hdr->numOfPKs <= TD_MAX_PK_COLS);
SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs]; SBlockCol *blockCol = &hdr->primaryBlockCols[hdr->numOfPKs];
SColData * colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs); SColData *colData = tBlockDataGetColDataByIdx(bData, hdr->numOfPKs);
if ((colData->cflag & COL_IS_KEY) == 0) { if ((colData->cflag & COL_IS_KEY) == 0) {
break; break;