refactor tsdb code
This commit is contained in:
parent
aeb21a536e
commit
931749dca6
|
@ -15,6 +15,7 @@
|
|||
|
||||
#include "taoserror.h"
|
||||
#include "tcoding.h"
|
||||
#include "tutil.h"
|
||||
|
||||
struct SBuffer {
|
||||
uint32_t size;
|
||||
|
@ -67,8 +68,7 @@ static FORCE_INLINE int32_t tBufferEnsureCapacity(SBuffer *buffer, uint32_t capa
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t tBufferPut(SBuffer *buffer, const void *data, uint32_t size) {
|
||||
int32_t code = tBufferEnsureCapacity(buffer, buffer->size + size);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferEnsureCapacity(buffer, buffer->size + size));
|
||||
memcpy((char *)buffer->data + buffer->size, data, size);
|
||||
buffer->size += size;
|
||||
return 0;
|
||||
|
@ -119,10 +119,8 @@ static FORCE_INLINE int32_t tBufferPutU16v(SBuffer *buffer, uint16_t value) { re
|
|||
static FORCE_INLINE int32_t tBufferPutU32v(SBuffer *buffer, uint32_t value) { return tBufferPutU64v(buffer, value); }
|
||||
|
||||
static FORCE_INLINE int32_t tBufferPutU64v(SBuffer *buffer, uint64_t value) {
|
||||
int32_t code;
|
||||
while (value >= 0x80) {
|
||||
code = tBufferPutU8(buffer, (value & 0x7F) | 0x80);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferPutU8(buffer, (value & 0x7F) | 0x80));
|
||||
value >>= 7;
|
||||
}
|
||||
return tBufferPutU8(buffer, value);
|
||||
|
@ -141,8 +139,7 @@ static FORCE_INLINE int32_t tBufferPutI64v(SBuffer *buffer, int64_t value) {
|
|||
}
|
||||
|
||||
static FORCE_INLINE int32_t tBufferPutBinary(SBuffer *buffer, const void *data, uint32_t size) {
|
||||
int32_t code = tBufferPutU32v(buffer, size);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferPutU32v(buffer, size));
|
||||
return tBufferPut(buffer, data, size);
|
||||
}
|
||||
|
||||
|
@ -324,8 +321,7 @@ static int32_t tBufferGetF32(SBufferReader *reader, float *value) {
|
|||
float f;
|
||||
uint32_t u;
|
||||
} u;
|
||||
int32_t code = tBufferGetU32(reader, &u.u);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetU32(reader, &u.u));
|
||||
if (value) {
|
||||
*value = u.f;
|
||||
}
|
||||
|
@ -337,8 +333,7 @@ static int32_t tBufferGetF64(SBufferReader *reader, double *value) {
|
|||
double f;
|
||||
uint64_t u;
|
||||
} u;
|
||||
int32_t code = tBufferGetU64(reader, &u.u);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetU64(reader, &u.u));
|
||||
if (value) {
|
||||
*value = u.f;
|
||||
}
|
||||
|
|
|
@ -172,6 +172,8 @@ static FORCE_INLINE int32_t taosGetTbHashVal(const char *tbname, int32_t tblen,
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define TAOS_UNUSED(expr) (void)(expr)
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -147,7 +147,6 @@ int32_t tsdbRowKeyCmpr(const STsdbRowKey *key1, const STsdbRowKey *key2);
|
|||
void tsdbRowGetKey(TSDBROW *row, STsdbRowKey *key);
|
||||
void tColRowGetPrimaryKey(SBlockData *pBlock, int32_t irow, SRowKey *key);
|
||||
|
||||
|
||||
// STSDBRowIter
|
||||
int32_t tsdbRowIterOpen(STSDBRowIter *pIter, TSDBROW *pRow, STSchema *pTSchema);
|
||||
void tsdbRowClose(STSDBRowIter *pIter);
|
||||
|
@ -223,11 +222,11 @@ void tMapDataReset(SMapData *pMapData);
|
|||
void tMapDataClear(SMapData *pMapData);
|
||||
int32_t tMapDataPutItem(SMapData *pMapData, void *pItem, int32_t (*tPutItemFn)(uint8_t *, void *));
|
||||
int32_t tMapDataCopy(SMapData *pFrom, SMapData *pTo);
|
||||
void tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
|
||||
int32_t tMapDataGetItemByIdx(SMapData *pMapData, int32_t idx, void *pItem, int32_t (*tGetItemFn)(uint8_t *, void *));
|
||||
int32_t tMapDataSearch(SMapData *pMapData, void *pSearchItem, int32_t (*tGetItemFn)(uint8_t *, void *),
|
||||
int32_t (*tItemCmprFn)(const void *, const void *), void *pItem);
|
||||
int32_t tPutMapData(uint8_t *p, SMapData *pMapData);
|
||||
int32_t tGetMapData(uint8_t *p, SMapData *pMapData);
|
||||
int32_t tGetMapData(uint8_t *p, SMapData *pMapData, int64_t *decodeSize);
|
||||
int32_t tMapDataToArray(SMapData *pMapData, int32_t itemSize, int32_t (*tGetItemFn)(uint8_t *, void *),
|
||||
SArray **ppArray);
|
||||
// other
|
||||
|
@ -245,10 +244,10 @@ void tsdbMemTableDestroy(SMemTable *pMemTable, bool proactive);
|
|||
STbData *tsdbGetTbDataFromMemTable(SMemTable *pMemTable, tb_uid_t suid, tb_uid_t uid);
|
||||
int32_t tsdbRefMemTable(SMemTable *pMemTable, SQueryNode *pQNode);
|
||||
int32_t tsdbUnrefMemTable(SMemTable *pMemTable, SQueryNode *pNode, bool proactive);
|
||||
SArray * tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
|
||||
SArray *tsdbMemTableGetTbDataArray(SMemTable *pMemTable);
|
||||
// STbDataIter
|
||||
int32_t tsdbTbDataIterCreate(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter **ppIter);
|
||||
void * tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||
void *tsdbTbDataIterDestroy(STbDataIter *pIter);
|
||||
void tsdbTbDataIterOpen(STbData *pTbData, STsdbRowKey *pFrom, int8_t backward, STbDataIter *pIter);
|
||||
bool tsdbTbDataIterNext(STbDataIter *pIter);
|
||||
void tsdbMemTableCountRows(SMemTable *pMemTable, SSHashObj *pTableMap, int64_t *rowsNum);
|
||||
|
@ -301,7 +300,7 @@ int32_t tsdbGetTableSchema(SMeta *pMeta, int64_t uid, STSchema **pSchema, int64_
|
|||
|
||||
// tsdbMerge.c ==============================================================================================
|
||||
typedef struct {
|
||||
STsdb * tsdb;
|
||||
STsdb *tsdb;
|
||||
int32_t fid;
|
||||
} SMergeArg;
|
||||
|
||||
|
@ -332,22 +331,22 @@ int32_t tsdbDataIterNext2(STsdbDataIter2 *pIter, STsdbFilterInfo *pFilterInfo);
|
|||
// structs =======================
|
||||
struct STsdbFS {
|
||||
SDelFile *pDelFile;
|
||||
SArray * aDFileSet; // SArray<SDFileSet>
|
||||
SArray *aDFileSet; // SArray<SDFileSet>
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
rocksdb_t * db;
|
||||
rocksdb_comparator_t * my_comparator;
|
||||
rocksdb_cache_t * blockcache;
|
||||
rocksdb_t *db;
|
||||
rocksdb_comparator_t *my_comparator;
|
||||
rocksdb_cache_t *blockcache;
|
||||
rocksdb_block_based_table_options_t *tableoptions;
|
||||
rocksdb_options_t * options;
|
||||
rocksdb_flushoptions_t * flushoptions;
|
||||
rocksdb_writeoptions_t * writeoptions;
|
||||
rocksdb_readoptions_t * readoptions;
|
||||
rocksdb_writebatch_t * writebatch;
|
||||
rocksdb_writebatch_t * rwritebatch;
|
||||
rocksdb_options_t *options;
|
||||
rocksdb_flushoptions_t *flushoptions;
|
||||
rocksdb_writeoptions_t *writeoptions;
|
||||
rocksdb_readoptions_t *readoptions;
|
||||
rocksdb_writebatch_t *writebatch;
|
||||
rocksdb_writebatch_t *rwritebatch;
|
||||
TdThreadMutex rMutex;
|
||||
STSchema * pTSchema;
|
||||
STSchema *pTSchema;
|
||||
} SRocksCache;
|
||||
|
||||
typedef struct {
|
||||
|
@ -358,26 +357,26 @@ typedef struct {
|
|||
typedef struct SCompMonitor SCompMonitor;
|
||||
|
||||
struct STsdb {
|
||||
char * path;
|
||||
SVnode * pVnode;
|
||||
char *path;
|
||||
SVnode *pVnode;
|
||||
STsdbKeepCfg keepCfg;
|
||||
TdThreadMutex mutex;
|
||||
bool bgTaskDisabled;
|
||||
SMemTable * mem;
|
||||
SMemTable * imem;
|
||||
SMemTable *mem;
|
||||
SMemTable *imem;
|
||||
STsdbFS fs; // old
|
||||
SLRUCache * lruCache;
|
||||
SLRUCache *lruCache;
|
||||
SCacheFlushState flushState;
|
||||
TdThreadMutex lruMutex;
|
||||
SLRUCache * biCache;
|
||||
SLRUCache *biCache;
|
||||
TdThreadMutex biMutex;
|
||||
SLRUCache * bCache;
|
||||
SLRUCache *bCache;
|
||||
TdThreadMutex bMutex;
|
||||
SLRUCache * pgCache;
|
||||
SLRUCache *pgCache;
|
||||
TdThreadMutex pgMutex;
|
||||
struct STFileSystem *pFS; // new
|
||||
SRocksCache rCache;
|
||||
SCompMonitor *pCompMonitor;
|
||||
SCompMonitor *pCompMonitor;
|
||||
struct {
|
||||
SVHashTable *ht;
|
||||
SArray *arr;
|
||||
|
@ -405,17 +404,17 @@ struct STbData {
|
|||
TSKEY minKey;
|
||||
TSKEY maxKey;
|
||||
SRWLatch lock;
|
||||
SDelData * pHead;
|
||||
SDelData * pTail;
|
||||
SDelData *pHead;
|
||||
SDelData *pTail;
|
||||
SMemSkipList sl;
|
||||
STbData * next;
|
||||
STbData *next;
|
||||
SRBTreeNode rbtn[1];
|
||||
};
|
||||
|
||||
struct SMemTable {
|
||||
SRWLatch latch;
|
||||
STsdb * pTsdb;
|
||||
SVBufPool * pPool;
|
||||
STsdb *pTsdb;
|
||||
SVBufPool *pPool;
|
||||
volatile int32_t nRef;
|
||||
int64_t minVer;
|
||||
int64_t maxVer;
|
||||
|
@ -425,7 +424,7 @@ struct SMemTable {
|
|||
int64_t nDel;
|
||||
int32_t nTbData;
|
||||
int32_t nBucket;
|
||||
STbData ** aBucket;
|
||||
STbData **aBucket;
|
||||
SRBTree tbDataTree[1];
|
||||
};
|
||||
|
||||
|
@ -434,7 +433,7 @@ struct TSDBROW {
|
|||
union {
|
||||
struct {
|
||||
int64_t version;
|
||||
SRow * pTSRow;
|
||||
SRow *pTSRow;
|
||||
};
|
||||
struct {
|
||||
SBlockData *pBlockData;
|
||||
|
@ -535,9 +534,9 @@ struct SBlockData {
|
|||
int64_t suid; // 0 means normal table block data, otherwise child table block data
|
||||
int64_t uid; // 0 means block data in .last file, otherwise in .data file
|
||||
int32_t nRow; // number of rows
|
||||
int64_t * aUid; // uids of each row, only exist in block data in .last file (uid == 0)
|
||||
int64_t * aVersion; // versions of each row
|
||||
TSKEY * aTSKEY; // timestamp of each row
|
||||
int64_t *aUid; // uids of each row, only exist in block data in .last file (uid == 0)
|
||||
int64_t *aVersion; // versions of each row
|
||||
TSKEY *aTSKEY; // timestamp of each row
|
||||
int32_t nColData;
|
||||
SColData *aColData;
|
||||
};
|
||||
|
@ -548,10 +547,10 @@ struct TABLEID {
|
|||
};
|
||||
|
||||
struct STbDataIter {
|
||||
STbData * pTbData;
|
||||
STbData *pTbData;
|
||||
int8_t backward;
|
||||
SMemSkipListNode *pNode;
|
||||
TSDBROW * pRow;
|
||||
TSDBROW *pRow;
|
||||
TSDBROW row;
|
||||
};
|
||||
|
||||
|
@ -629,9 +628,9 @@ struct SDFileSet {
|
|||
int32_t fid;
|
||||
SHeadFile *pHeadF;
|
||||
SDataFile *pDataF;
|
||||
SSmaFile * pSmaF;
|
||||
SSmaFile *pSmaF;
|
||||
uint8_t nSttF;
|
||||
SSttFile * aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE];
|
||||
SSttFile *aSttF[TSDB_STT_TRIGGER_ARRAY_SIZE];
|
||||
};
|
||||
|
||||
struct STSDBRowIter {
|
||||
|
@ -647,18 +646,18 @@ struct STSDBRowIter {
|
|||
struct SRowMerger {
|
||||
STSchema *pTSchema;
|
||||
int64_t version;
|
||||
SArray * pArray; // SArray<SColVal>
|
||||
SArray *pArray; // SArray<SColVal>
|
||||
};
|
||||
|
||||
typedef struct {
|
||||
char * path;
|
||||
char *path;
|
||||
int32_t szPage;
|
||||
int32_t flag;
|
||||
TdFilePtr pFD;
|
||||
int64_t pgno;
|
||||
uint8_t * pBuf;
|
||||
uint8_t *pBuf;
|
||||
int64_t szFile;
|
||||
STsdb * pTsdb;
|
||||
STsdb *pTsdb;
|
||||
const char *objName;
|
||||
uint8_t s3File;
|
||||
int32_t lcn;
|
||||
|
@ -668,7 +667,7 @@ typedef struct {
|
|||
} STsdbFD;
|
||||
|
||||
struct SDelFWriter {
|
||||
STsdb * pTsdb;
|
||||
STsdb *pTsdb;
|
||||
SDelFile fDel;
|
||||
STsdbFD *pWriteH;
|
||||
uint8_t *aBuf[1];
|
||||
|
@ -728,15 +727,15 @@ int32_t tDeserializeTsdbRepOpts(void *buf, int32_t bufLen, STsdbRepOpts *pInfo);
|
|||
|
||||
// snap read
|
||||
struct STsdbReadSnap {
|
||||
SMemTable * pMem;
|
||||
SQueryNode * pNode;
|
||||
SMemTable * pIMem;
|
||||
SQueryNode * pINode;
|
||||
SMemTable *pMem;
|
||||
SQueryNode *pNode;
|
||||
SMemTable *pIMem;
|
||||
SQueryNode *pINode;
|
||||
TFileSetArray *pfSetArray;
|
||||
};
|
||||
|
||||
struct SDataFWriter {
|
||||
STsdb * pTsdb;
|
||||
STsdb *pTsdb;
|
||||
SDFileSet wSet;
|
||||
|
||||
STsdbFD *pHeadFD;
|
||||
|
@ -753,13 +752,13 @@ struct SDataFWriter {
|
|||
};
|
||||
|
||||
struct SDataFReader {
|
||||
STsdb * pTsdb;
|
||||
STsdb *pTsdb;
|
||||
SDFileSet *pSet;
|
||||
STsdbFD * pHeadFD;
|
||||
STsdbFD * pDataFD;
|
||||
STsdbFD * pSmaFD;
|
||||
STsdbFD * aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE];
|
||||
uint8_t * aBuf[3];
|
||||
STsdbFD *pHeadFD;
|
||||
STsdbFD *pDataFD;
|
||||
STsdbFD *pSmaFD;
|
||||
STsdbFD *aSttFD[TSDB_STT_TRIGGER_ARRAY_SIZE];
|
||||
uint8_t *aBuf[3];
|
||||
};
|
||||
|
||||
// NOTE: do NOT change the order of the fields
|
||||
|
@ -794,10 +793,10 @@ typedef struct {
|
|||
|
||||
typedef struct SSttBlockLoadInfo {
|
||||
SBlockDataInfo blockData[2]; // buffered block data
|
||||
SArray * aSttBlk;
|
||||
SArray *aSttBlk;
|
||||
int32_t currentLoadBlockIndex;
|
||||
STSchema * pSchema;
|
||||
int16_t * colIds;
|
||||
STSchema *pSchema;
|
||||
int16_t *colIds;
|
||||
int32_t numOfCols;
|
||||
bool checkRemainingRow; // todo: no assign value?
|
||||
bool isLast;
|
||||
|
@ -834,7 +833,7 @@ struct SDiskData {
|
|||
const uint8_t *pUid;
|
||||
const uint8_t *pVer;
|
||||
const uint8_t *pKey;
|
||||
SArray * aDiskCol; // SArray<SDiskCol>
|
||||
SArray *aDiskCol; // SArray<SDiskCol>
|
||||
};
|
||||
|
||||
struct SDiskDataBuilder {
|
||||
|
@ -847,15 +846,15 @@ struct SDiskDataBuilder {
|
|||
SCompressor *pVerC;
|
||||
SCompressor *pKeyC;
|
||||
int32_t nBuilder;
|
||||
SArray * aBuilder; // SArray<SDiskColBuilder>
|
||||
uint8_t * aBuf[2];
|
||||
SArray *aBuilder; // SArray<SDiskColBuilder>
|
||||
uint8_t *aBuf[2];
|
||||
SDiskData dd;
|
||||
SBlkInfo bi;
|
||||
};
|
||||
|
||||
struct SLDataIter {
|
||||
SRBTreeNode node;
|
||||
SSttBlk * pSttBlk;
|
||||
SSttBlk *pSttBlk;
|
||||
int64_t cid; // for debug purpose
|
||||
int8_t backward;
|
||||
int32_t iSttBlk;
|
||||
|
@ -864,8 +863,8 @@ struct SLDataIter {
|
|||
uint64_t uid;
|
||||
STimeWindow timeWindow;
|
||||
SVersionRange verRange;
|
||||
SSttBlockLoadInfo * pBlockLoadInfo;
|
||||
SRowKey * pStartRowKey; // current row key
|
||||
SSttBlockLoadInfo *pBlockLoadInfo;
|
||||
SRowKey *pStartRowKey; // current row key
|
||||
bool ignoreEarlierTs;
|
||||
struct SSttFileReader *pReader;
|
||||
};
|
||||
|
@ -878,21 +877,21 @@ typedef int32_t (*_load_tomb_fn)(STsdbReader *pReader, struct SSttFileReader *pS
|
|||
|
||||
typedef struct SMergeTreeConf {
|
||||
int8_t backward;
|
||||
STsdb * pTsdb;
|
||||
STsdb *pTsdb;
|
||||
uint64_t suid;
|
||||
uint64_t uid;
|
||||
STimeWindow timewindow;
|
||||
SVersionRange verRange;
|
||||
bool strictTimeRange;
|
||||
SArray * pSttFileBlockIterArray;
|
||||
void * pCurrentFileset;
|
||||
STSchema * pSchema;
|
||||
int16_t * pCols;
|
||||
SArray *pSttFileBlockIterArray;
|
||||
void *pCurrentFileset;
|
||||
STSchema *pSchema;
|
||||
int16_t *pCols;
|
||||
int32_t numOfCols;
|
||||
SRowKey * pCurRowKey;
|
||||
SRowKey *pCurRowKey;
|
||||
_load_tomb_fn loadTombFn;
|
||||
void * pReader;
|
||||
void * idstr;
|
||||
void *pReader;
|
||||
void *idstr;
|
||||
bool rspRows; // response the rows in stt-file, if possible
|
||||
} SMergeTreeConf;
|
||||
|
||||
|
@ -1023,7 +1022,7 @@ struct STsdbDataIter2 {
|
|||
// TSDB_DATA_FILE_DATA_ITER
|
||||
struct {
|
||||
SDataFReader *pReader;
|
||||
SArray * aBlockIdx; // SArray<SBlockIdx>
|
||||
SArray *aBlockIdx; // SArray<SBlockIdx>
|
||||
SMapData mDataBlk;
|
||||
SBlockData bData;
|
||||
int32_t iBlockIdx;
|
||||
|
@ -1035,7 +1034,7 @@ struct STsdbDataIter2 {
|
|||
struct {
|
||||
SDataFReader *pReader;
|
||||
int32_t iStt;
|
||||
SArray * aSttBlk;
|
||||
SArray *aSttBlk;
|
||||
SBlockData bData;
|
||||
int32_t iSttBlk;
|
||||
int32_t iRow;
|
||||
|
@ -1043,8 +1042,8 @@ struct STsdbDataIter2 {
|
|||
// TSDB_TOMB_FILE_DATA_ITER
|
||||
struct {
|
||||
SDelFReader *pReader;
|
||||
SArray * aDelIdx;
|
||||
SArray * aDelData;
|
||||
SArray *aDelIdx;
|
||||
SArray *aDelData;
|
||||
int32_t iDelIdx;
|
||||
int32_t iDelData;
|
||||
} tIter;
|
||||
|
|
|
@ -803,11 +803,8 @@ int32_t tsdbReadDataBlk(SDataFReader *pReader, SBlockIdx *pBlockIdx, SMapData *m
|
|||
if (code) goto _err;
|
||||
|
||||
// decode
|
||||
int64_t n = tGetMapData(pReader->aBuf[0], mDataBlk);
|
||||
if (n < 0) {
|
||||
code = TSDB_CODE_OUT_OF_MEMORY;
|
||||
goto _err;
|
||||
}
|
||||
int64_t n;
|
||||
TAOS_CHECK_GOTO(tGetMapData(pReader->aBuf[0], mDataBlk, &n), NULL, _err);
|
||||
ASSERT(n == size);
|
||||
|
||||
return code;
|
||||
|
|
|
@ -37,8 +37,8 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
|||
int8_t cmprAlg;
|
||||
int32_t szPage;
|
||||
SBuffer buffers[10];
|
||||
int32_t encryptAlgorithm;
|
||||
char* encryptKey;
|
||||
int32_t encryptAlgorithm;
|
||||
char *encryptKey;
|
||||
// reader
|
||||
SArray *aBlockIdx;
|
||||
SMapData mDataBlk[1];
|
||||
|
@ -96,7 +96,7 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
|||
|
||||
for (int32_t iDataBlk = 0; iDataBlk < ctx->mDataBlk->nItem; ++iDataBlk) {
|
||||
SDataBlk dataBlk[1];
|
||||
tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk);
|
||||
TAOS_CHECK_GOTO(tMapDataGetItemByIdx(ctx->mDataBlk, iDataBlk, dataBlk, tGetDataBlk), &lino, _exit);
|
||||
|
||||
SBrinRecord record = {
|
||||
.suid = pBlockIdx->suid,
|
||||
|
@ -139,8 +139,9 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
|||
|
||||
if (ctx->brinBlock->numOfRecords >= ctx->maxRow) {
|
||||
SVersionRange range = {.minVer = VERSION_MAX, .maxVer = VERSION_MIN};
|
||||
code = tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||
ctx->brinBlkArray, ctx->buffers, &range, ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
code =
|
||||
tsdbFileWriteBrinBlock(ctx->fd, ctx->brinBlock, ctx->cmprAlg, &fset->farr[TSDB_FTYPE_HEAD]->f->size,
|
||||
ctx->brinBlkArray, ctx->buffers, &range, ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
}
|
||||
|
@ -157,8 +158,8 @@ static int32_t tsdbUpgradeHead(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReader *
|
|||
&fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer, ctx->encryptAlgorithm,
|
||||
ctx->encryptKey);
|
||||
code = tsdbFileWriteHeadFooter(ctx->fd, &fset->farr[TSDB_FTYPE_HEAD]->f->size, ctx->footer, ctx->encryptAlgorithm,
|
||||
ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFsyncFile(ctx->fd, ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
|
@ -258,7 +259,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade
|
|||
struct {
|
||||
int32_t szPage;
|
||||
int32_t encryptAlgorithm;
|
||||
char* encryptKey;
|
||||
char *encryptKey;
|
||||
// writer
|
||||
STsdbFD *fd;
|
||||
TSttBlkArray sttBlkArray[1];
|
||||
|
@ -290,7 +291,7 @@ static int32_t tsdbUpgradeSttFile(STsdb *tsdb, SDFileSet *pDFileSet, SDataFReade
|
|||
}
|
||||
|
||||
code = tsdbFileWriteSttBlk(ctx->fd, ctx->sttBlkArray, ctx->footer->sttBlkPtr, &fobj->f->size, ctx->encryptAlgorithm,
|
||||
ctx->encryptKey);
|
||||
ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit1);
|
||||
|
||||
code = tsdbFileWriteSttFooter(ctx->fd, ctx->footer, &fobj->f->size, ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
|
@ -442,7 +443,7 @@ static int32_t tsdbUpgradeOpenTombFile(STsdb *tsdb, STFileSet *fset, STsdbFD **f
|
|||
|
||||
uint8_t hdr[TSDB_FHDR_SIZE] = {0};
|
||||
int32_t encryptAlgorithm = tsdb->pVnode->config.tsdbCfg.encryptAlgorithm;
|
||||
char* encryptKey = tsdb->pVnode->config.tsdbCfg.encryptKey;
|
||||
char *encryptKey = tsdb->pVnode->config.tsdbCfg.encryptKey;
|
||||
|
||||
code = tsdbWriteFile(fd[0], 0, hdr, TSDB_FHDR_SIZE, encryptAlgorithm, encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
@ -467,8 +468,8 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
|
|||
int64_t minKey;
|
||||
int64_t maxKey;
|
||||
SBuffer buffers[10];
|
||||
int32_t encryptAlgorithm;
|
||||
char* encryptKey;
|
||||
int32_t encryptAlgorithm;
|
||||
char *encryptKey;
|
||||
// reader
|
||||
SArray *aDelData;
|
||||
// writer
|
||||
|
@ -538,20 +539,20 @@ static int32_t tsdbDumpTombDataToFSet(STsdb *tsdb, SDelFReader *reader, SArray *
|
|||
|
||||
if (ctx->fd != NULL) {
|
||||
if (ctx->toStt) {
|
||||
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size,
|
||||
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->sttFooter->tombBlkPtr, &ctx->fobj->f->size,
|
||||
ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm,
|
||||
ctx->encryptKey);
|
||||
code =
|
||||
tsdbFileWriteSttFooter(ctx->fd, ctx->sttFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
} else {
|
||||
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size,
|
||||
code = tsdbFileWriteTombBlk(ctx->fd, ctx->tombBlkArray, ctx->tombFooter->tombBlkPtr, &ctx->fobj->f->size,
|
||||
ctx->encryptAlgorithm, ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
|
||||
code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm,
|
||||
ctx->encryptKey);
|
||||
code = tsdbFileWriteTombFooter(ctx->fd, ctx->tombFooter, &ctx->fobj->f->size, ctx->encryptAlgorithm,
|
||||
ctx->encryptKey);
|
||||
TSDB_CHECK_CODE(code, lino, _exit);
|
||||
}
|
||||
|
||||
|
|
File diff suppressed because it is too large
Load Diff
|
@ -17,17 +17,24 @@
|
|||
|
||||
// SDelBlock ----------
|
||||
int32_t tTombBlockInit(STombBlock *tombBlock) {
|
||||
int32_t code;
|
||||
|
||||
tombBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
|
||||
tBufferInit(&tombBlock->buffers[i]);
|
||||
TAOS_CHECK_GOTO(tBufferInit(&tombBlock->buffers[i]), NULL, _exit);
|
||||
}
|
||||
return 0;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TAOS_UNUSED(tTombBlockDestroy(tombBlock));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tTombBlockDestroy(STombBlock *tombBlock) {
|
||||
tombBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
|
||||
tBufferDestroy(&tombBlock->buffers[i]);
|
||||
TAOS_UNUSED(tBufferDestroy(&tombBlock->buffers[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -35,15 +42,14 @@ int32_t tTombBlockDestroy(STombBlock *tombBlock) {
|
|||
int32_t tTombBlockClear(STombBlock *tombBlock) {
|
||||
tombBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
|
||||
tBufferClear(&tombBlock->buffers[i]);
|
||||
TAOS_UNUSED(tBufferClear(&tombBlock->buffers[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tTombBlockPut(STombBlock *tombBlock, const STombRecord *record) {
|
||||
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
|
||||
int32_t code = tBufferPutI64(&tombBlock->buffers[i], record->data[i]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&tombBlock->buffers[i], record->data[i]));
|
||||
}
|
||||
tombBlock->numOfRecords++;
|
||||
return 0;
|
||||
|
@ -56,8 +62,7 @@ int32_t tTombBlockGet(STombBlock *tombBlock, int32_t idx, STombRecord *record) {
|
|||
|
||||
for (int32_t i = 0; i < TOMB_RECORD_ELEM_NUM; ++i) {
|
||||
SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * idx, &tombBlock->buffers[i]);
|
||||
int32_t code = tBufferGetI64(&br, &record->data[i]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&br, &record->data[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -74,27 +79,34 @@ int32_t tTombRecordCompare(const STombRecord *r1, const STombRecord *r2) {
|
|||
|
||||
// STbStatisBlock ----------
|
||||
int32_t tStatisBlockInit(STbStatisBlock *statisBlock) {
|
||||
int32_t code;
|
||||
|
||||
statisBlock->numOfPKs = 0;
|
||||
statisBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
|
||||
tBufferInit(&statisBlock->buffers[i]);
|
||||
TAOS_CHECK_GOTO(tBufferInit(&statisBlock->buffers[i]), NULL, _exit);
|
||||
}
|
||||
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
|
||||
tValueColumnInit(&statisBlock->firstKeyPKs[i]);
|
||||
tValueColumnInit(&statisBlock->lastKeyPKs[i]);
|
||||
TAOS_CHECK_GOTO(tValueColumnInit(&statisBlock->firstKeyPKs[i]), NULL, _exit);
|
||||
TAOS_CHECK_GOTO(tValueColumnInit(&statisBlock->lastKeyPKs[i]), NULL, _exit);
|
||||
}
|
||||
return 0;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TAOS_UNUSED(tStatisBlockDestroy(statisBlock));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tStatisBlockDestroy(STbStatisBlock *statisBlock) {
|
||||
statisBlock->numOfPKs = 0;
|
||||
statisBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
|
||||
tBufferDestroy(&statisBlock->buffers[i]);
|
||||
TAOS_UNUSED(tBufferDestroy(&statisBlock->buffers[i]));
|
||||
}
|
||||
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
|
||||
tValueColumnDestroy(&statisBlock->firstKeyPKs[i]);
|
||||
tValueColumnDestroy(&statisBlock->lastKeyPKs[i]);
|
||||
TAOS_UNUSED(tValueColumnDestroy(&statisBlock->firstKeyPKs[i]));
|
||||
TAOS_UNUSED(tValueColumnDestroy(&statisBlock->lastKeyPKs[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -103,17 +115,16 @@ int32_t tStatisBlockClear(STbStatisBlock *statisBlock) {
|
|||
statisBlock->numOfPKs = 0;
|
||||
statisBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(statisBlock->buffers); ++i) {
|
||||
tBufferClear(&statisBlock->buffers[i]);
|
||||
TAOS_UNUSED(tBufferClear(&statisBlock->buffers[i]));
|
||||
}
|
||||
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
|
||||
tValueColumnClear(&statisBlock->firstKeyPKs[i]);
|
||||
tValueColumnClear(&statisBlock->lastKeyPKs[i]);
|
||||
TAOS_UNUSED(tValueColumnClear(&statisBlock->firstKeyPKs[i]));
|
||||
TAOS_UNUSED(tValueColumnClear(&statisBlock->lastKeyPKs[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) {
|
||||
int32_t code;
|
||||
STsdbRowKey key;
|
||||
|
||||
tsdbRowGetKey(&row->row, &key);
|
||||
|
@ -129,14 +140,14 @@ static int32_t tStatisBlockAppend(STbStatisBlock *block, SRowInfo *row) {
|
|||
}
|
||||
}
|
||||
|
||||
if ((code = tBufferPutI64(&block->suids, row->suid))) return code;
|
||||
if ((code = tBufferPutI64(&block->uids, row->uid))) return code;
|
||||
if ((code = tBufferPutI64(&block->firstKeyTimestamps, key.key.ts))) return code;
|
||||
if ((code = tBufferPutI64(&block->lastKeyTimestamps, key.key.ts))) return code;
|
||||
if ((code = tBufferPutI64(&block->counts, 1))) return code;
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&block->suids, row->suid));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&block->uids, row->uid));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&block->firstKeyTimestamps, key.key.ts));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&block->lastKeyTimestamps, key.key.ts));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&block->counts, 1));
|
||||
for (int32_t i = 0; i < block->numOfPKs; ++i) {
|
||||
if ((code = tValueColumnAppend(block->firstKeyPKs + i, key.key.pks + i))) return code;
|
||||
if ((code = tValueColumnAppend(block->lastKeyPKs + i, key.key.pks + i))) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnAppend(block->firstKeyPKs + i, key.key.pks + i));
|
||||
TAOS_CHECK_RETURN(tValueColumnAppend(block->lastKeyPKs + i, key.key.pks + i));
|
||||
}
|
||||
|
||||
block->numOfRecords++;
|
||||
|
@ -147,9 +158,8 @@ static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) {
|
|||
STbStatisRecord record;
|
||||
STsdbRowKey key;
|
||||
int32_t c;
|
||||
int32_t code;
|
||||
|
||||
tStatisBlockGet(block, block->numOfRecords - 1, &record);
|
||||
TAOS_CHECK_RETURN(tStatisBlockGet(block, block->numOfRecords - 1, &record));
|
||||
tsdbRowGetKey(&row->row, &key);
|
||||
|
||||
c = tRowKeyCompare(&record.lastKey, &key.key);
|
||||
|
@ -157,21 +167,18 @@ static int32_t tStatisBlockUpdate(STbStatisBlock *block, SRowInfo *row) {
|
|||
return 0;
|
||||
} else if (c < 0) {
|
||||
// last ts
|
||||
code = tBufferPutAt(&block->lastKeyTimestamps, (block->numOfRecords - 1) * sizeof(record.lastKey.ts), &key.key.ts,
|
||||
sizeof(key.key.ts));
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferPutAt(&block->lastKeyTimestamps, (block->numOfRecords - 1) * sizeof(record.lastKey.ts),
|
||||
&key.key.ts, sizeof(key.key.ts)));
|
||||
|
||||
// last primary keys
|
||||
for (int i = 0; i < block->numOfPKs; i++) {
|
||||
code = tValueColumnUpdate(&block->lastKeyPKs[i], block->numOfRecords - 1, &key.key.pks[i]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnUpdate(&block->lastKeyPKs[i], block->numOfRecords - 1, &key.key.pks[i]));
|
||||
}
|
||||
|
||||
// count
|
||||
record.count++;
|
||||
code = tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count,
|
||||
sizeof(record.count));
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferPutAt(&block->counts, (block->numOfRecords - 1) * sizeof(record.count), &record.count,
|
||||
sizeof(record.count)));
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
|
@ -183,8 +190,7 @@ int32_t tStatisBlockPut(STbStatisBlock *block, SRowInfo *row, int32_t maxRecords
|
|||
if (block->numOfRecords > 0) {
|
||||
int64_t lastUid;
|
||||
SBufferReader br = BUFFER_READER_INITIALIZER(sizeof(int64_t) * (block->numOfRecords - 1), &block->uids);
|
||||
int32_t code = tBufferGetI64(&br, &lastUid);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&br, &lastUid));
|
||||
|
||||
if (lastUid == row->uid) {
|
||||
return tStatisBlockUpdate(block, row);
|
||||
|
@ -196,7 +202,6 @@ int32_t tStatisBlockPut(STbStatisBlock *block, SRowInfo *row, int32_t maxRecords
|
|||
}
|
||||
|
||||
int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecord *record) {
|
||||
int32_t code;
|
||||
SBufferReader reader;
|
||||
|
||||
if (idx < 0 || idx >= statisBlock->numOfRecords) {
|
||||
|
@ -204,36 +209,29 @@ int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecor
|
|||
}
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->suid), &statisBlock->suids);
|
||||
code = tBufferGetI64(&reader, &record->suid);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->suid));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->uid), &statisBlock->uids);
|
||||
code = tBufferGetI64(&reader, &record->uid);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->uid));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->firstKey.ts), &statisBlock->firstKeyTimestamps);
|
||||
code = tBufferGetI64(&reader, &record->firstKey.ts);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->firstKey.ts));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->lastKey.ts), &statisBlock->lastKeyTimestamps);
|
||||
code = tBufferGetI64(&reader, &record->lastKey.ts);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->lastKey.ts));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(record->count), &statisBlock->counts);
|
||||
code = tBufferGetI64(&reader, &record->count);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->count));
|
||||
|
||||
// primary keys
|
||||
for (record->firstKey.numOfPKs = 0; record->firstKey.numOfPKs < statisBlock->numOfPKs; record->firstKey.numOfPKs++) {
|
||||
code = tValueColumnGet(&statisBlock->firstKeyPKs[record->firstKey.numOfPKs], idx,
|
||||
&record->firstKey.pks[record->firstKey.numOfPKs]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnGet(&statisBlock->firstKeyPKs[record->firstKey.numOfPKs], idx,
|
||||
&record->firstKey.pks[record->firstKey.numOfPKs]));
|
||||
}
|
||||
|
||||
for (record->lastKey.numOfPKs = 0; record->lastKey.numOfPKs < statisBlock->numOfPKs; record->lastKey.numOfPKs++) {
|
||||
code = tValueColumnGet(&statisBlock->lastKeyPKs[record->lastKey.numOfPKs], idx,
|
||||
&record->lastKey.pks[record->lastKey.numOfPKs]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnGet(&statisBlock->lastKeyPKs[record->lastKey.numOfPKs], idx,
|
||||
&record->lastKey.pks[record->lastKey.numOfPKs]));
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
@ -241,27 +239,34 @@ int32_t tStatisBlockGet(STbStatisBlock *statisBlock, int32_t idx, STbStatisRecor
|
|||
|
||||
// SBrinRecord ----------
|
||||
int32_t tBrinBlockInit(SBrinBlock *brinBlock) {
|
||||
int32_t code;
|
||||
|
||||
brinBlock->numOfPKs = 0;
|
||||
brinBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
|
||||
tBufferInit(&brinBlock->buffers[i]);
|
||||
TAOS_CHECK_GOTO(tBufferInit(&brinBlock->buffers[i]), NULL, _exit);
|
||||
}
|
||||
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
|
||||
tValueColumnInit(&brinBlock->firstKeyPKs[i]);
|
||||
tValueColumnInit(&brinBlock->lastKeyPKs[i]);
|
||||
TAOS_CHECK_GOTO(tValueColumnInit(&brinBlock->firstKeyPKs[i]), NULL, _exit);
|
||||
TAOS_CHECK_GOTO(tValueColumnInit(&brinBlock->lastKeyPKs[i]), NULL, _exit);
|
||||
}
|
||||
return 0;
|
||||
|
||||
_exit:
|
||||
if (code) {
|
||||
TAOS_UNUSED(tBrinBlockDestroy(brinBlock));
|
||||
}
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t tBrinBlockDestroy(SBrinBlock *brinBlock) {
|
||||
brinBlock->numOfPKs = 0;
|
||||
brinBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
|
||||
tBufferDestroy(&brinBlock->buffers[i]);
|
||||
TAOS_UNUSED(tBufferDestroy(&brinBlock->buffers[i]));
|
||||
}
|
||||
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
|
||||
tValueColumnDestroy(&brinBlock->firstKeyPKs[i]);
|
||||
tValueColumnDestroy(&brinBlock->lastKeyPKs[i]);
|
||||
TAOS_UNUSED(tValueColumnDestroy(&brinBlock->firstKeyPKs[i]));
|
||||
TAOS_UNUSED(tValueColumnDestroy(&brinBlock->lastKeyPKs[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -270,18 +275,16 @@ int32_t tBrinBlockClear(SBrinBlock *brinBlock) {
|
|||
brinBlock->numOfPKs = 0;
|
||||
brinBlock->numOfRecords = 0;
|
||||
for (int32_t i = 0; i < ARRAY_SIZE(brinBlock->buffers); ++i) {
|
||||
tBufferClear(&brinBlock->buffers[i]);
|
||||
TAOS_UNUSED(tBufferClear(&brinBlock->buffers[i]));
|
||||
}
|
||||
for (int32_t i = 0; i < TD_MAX_PK_COLS; ++i) {
|
||||
tValueColumnClear(&brinBlock->firstKeyPKs[i]);
|
||||
tValueColumnClear(&brinBlock->lastKeyPKs[i]);
|
||||
TAOS_UNUSED(tValueColumnClear(&brinBlock->firstKeyPKs[i]));
|
||||
TAOS_UNUSED(tValueColumnClear(&brinBlock->lastKeyPKs[i]));
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
|
||||
int32_t code;
|
||||
|
||||
ASSERT(record->firstKey.key.numOfPKs == record->lastKey.key.numOfPKs);
|
||||
|
||||
if (brinBlock->numOfRecords == 0) { // the first row
|
||||
|
@ -298,60 +301,29 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
|
|||
}
|
||||
}
|
||||
|
||||
code = tBufferPutI64(&brinBlock->suids, record->suid);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->uids, record->uid);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->firstKeyTimestamps, record->firstKey.key.ts);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->minVers, record->minVer);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->maxVers, record->maxVer);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->blockOffsets, record->blockOffset);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI64(&brinBlock->smaOffsets, record->smaOffset);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI32(&brinBlock->blockSizes, record->blockSize);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI32(&brinBlock->blockKeySizes, record->blockKeySize);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI32(&brinBlock->smaSizes, record->smaSize);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI32(&brinBlock->numRows, record->numRow);
|
||||
if (code) return code;
|
||||
|
||||
code = tBufferPutI32(&brinBlock->counts, record->count);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->suids, record->suid));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->uids, record->uid));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->firstKeyTimestamps, record->firstKey.key.ts));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->firstKeyVersions, record->firstKey.version));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->lastKeyTimestamps, record->lastKey.key.ts));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->lastKeyVersions, record->lastKey.version));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->minVers, record->minVer));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->maxVers, record->maxVer));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->blockOffsets, record->blockOffset));
|
||||
TAOS_CHECK_RETURN(tBufferPutI64(&brinBlock->smaOffsets, record->smaOffset));
|
||||
TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->blockSizes, record->blockSize));
|
||||
TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->blockKeySizes, record->blockKeySize));
|
||||
TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->smaSizes, record->smaSize));
|
||||
TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->numRows, record->numRow));
|
||||
TAOS_CHECK_RETURN(tBufferPutI32(&brinBlock->counts, record->count));
|
||||
|
||||
if (brinBlock->numOfPKs > 0) {
|
||||
for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) {
|
||||
code = tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnAppend(&brinBlock->firstKeyPKs[i], &record->firstKey.key.pks[i]));
|
||||
}
|
||||
|
||||
for (int32_t i = 0; i < brinBlock->numOfPKs; ++i) {
|
||||
code = tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnAppend(&brinBlock->lastKeyPKs[i], &record->lastKey.key.pks[i]));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -361,7 +333,6 @@ int32_t tBrinBlockPut(SBrinBlock *brinBlock, const SBrinRecord *record) {
|
|||
}
|
||||
|
||||
int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) {
|
||||
int32_t code;
|
||||
SBufferReader reader;
|
||||
|
||||
if (idx < 0 || idx >= brinBlock->numOfRecords) {
|
||||
|
@ -369,78 +340,61 @@ int32_t tBrinBlockGet(SBrinBlock *brinBlock, int32_t idx, SBrinRecord *record) {
|
|||
}
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->suids);
|
||||
code = tBufferGetI64(&reader, &record->suid);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->suid));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->uids);
|
||||
code = tBufferGetI64(&reader, &record->uid);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->uid));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyTimestamps);
|
||||
code = tBufferGetI64(&reader, &record->firstKey.key.ts);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->firstKey.key.ts));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->firstKeyVersions);
|
||||
code = tBufferGetI64(&reader, &record->firstKey.version);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->firstKey.version));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyTimestamps);
|
||||
code = tBufferGetI64(&reader, &record->lastKey.key.ts);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->lastKey.key.ts));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->lastKeyVersions);
|
||||
code = tBufferGetI64(&reader, &record->lastKey.version);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->lastKey.version));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->minVers);
|
||||
code = tBufferGetI64(&reader, &record->minVer);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->minVer));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->maxVers);
|
||||
code = tBufferGetI64(&reader, &record->maxVer);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->maxVer));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->blockOffsets);
|
||||
code = tBufferGetI64(&reader, &record->blockOffset);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->blockOffset));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int64_t), &brinBlock->smaOffsets);
|
||||
code = tBufferGetI64(&reader, &record->smaOffset);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI64(&reader, &record->smaOffset));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockSizes);
|
||||
code = tBufferGetI32(&reader, &record->blockSize);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->blockSize));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->blockKeySizes);
|
||||
code = tBufferGetI32(&reader, &record->blockKeySize);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->blockKeySize));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->smaSizes);
|
||||
code = tBufferGetI32(&reader, &record->smaSize);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->smaSize));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->numRows);
|
||||
code = tBufferGetI32(&reader, &record->numRow);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->numRow));
|
||||
|
||||
reader = BUFFER_READER_INITIALIZER(idx * sizeof(int32_t), &brinBlock->counts);
|
||||
code = tBufferGetI32(&reader, &record->count);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tBufferGetI32(&reader, &record->count));
|
||||
|
||||
// primary keys
|
||||
for (record->firstKey.key.numOfPKs = 0; record->firstKey.key.numOfPKs < brinBlock->numOfPKs;
|
||||
record->firstKey.key.numOfPKs++) {
|
||||
code = tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx,
|
||||
&record->firstKey.key.pks[record->firstKey.key.numOfPKs]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnGet(&brinBlock->firstKeyPKs[record->firstKey.key.numOfPKs], idx,
|
||||
&record->firstKey.key.pks[record->firstKey.key.numOfPKs]));
|
||||
}
|
||||
|
||||
for (record->lastKey.key.numOfPKs = 0; record->lastKey.key.numOfPKs < brinBlock->numOfPKs;
|
||||
record->lastKey.key.numOfPKs++) {
|
||||
code = tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx,
|
||||
&record->lastKey.key.pks[record->lastKey.key.numOfPKs]);
|
||||
if (code) return code;
|
||||
TAOS_CHECK_RETURN(tValueColumnGet(&brinBlock->lastKeyPKs[record->lastKey.key.numOfPKs], idx,
|
||||
&record->lastKey.key.pks[record->lastKey.key.numOfPKs]));
|
||||
}
|
||||
|
||||
return 0;
|
||||
|
|
Loading…
Reference in New Issue