refact more
This commit is contained in:
parent
0e1a27c750
commit
94bcbf6bac
|
@ -29,7 +29,7 @@ target_sources(
|
||||||
"src/meta/metaTDBImpl.c"
|
"src/meta/metaTDBImpl.c"
|
||||||
|
|
||||||
# tsdb
|
# tsdb
|
||||||
"src/tsdb/tsdbBDBImpl.c"
|
# "src/tsdb/tsdbBDBImpl.c"
|
||||||
"src/tsdb/tsdbCommit.c"
|
"src/tsdb/tsdbCommit.c"
|
||||||
"src/tsdb/tsdbCompact.c"
|
"src/tsdb/tsdbCompact.c"
|
||||||
"src/tsdb/tsdbFile.c"
|
"src/tsdb/tsdbFile.c"
|
||||||
|
@ -40,7 +40,7 @@ target_sources(
|
||||||
"src/tsdb/tsdbRead.c"
|
"src/tsdb/tsdbRead.c"
|
||||||
"src/tsdb/tsdbReadImpl.c"
|
"src/tsdb/tsdbReadImpl.c"
|
||||||
"src/tsdb/tsdbScan.c"
|
"src/tsdb/tsdbScan.c"
|
||||||
"src/tsdb/tsdbSma.c"
|
# "src/tsdb/tsdbSma.c"
|
||||||
"src/tsdb/tsdbWrite.c"
|
"src/tsdb/tsdbWrite.c"
|
||||||
|
|
||||||
# tq
|
# tq
|
||||||
|
|
|
@ -20,8 +20,917 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
typedef struct SSmaStat SSmaStat;
|
||||||
|
typedef struct SSmaEnv SSmaEnv;
|
||||||
|
typedef struct SSmaEnvs SSmaEnvs;
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_FILE_HEAD = 0, // .head
|
||||||
|
TSDB_FILE_DATA, // .data
|
||||||
|
TSDB_FILE_LAST, // .last
|
||||||
|
TSDB_FILE_SMAD, // .smad(Block-wise SMA)
|
||||||
|
TSDB_FILE_SMAL, // .smal(Block-wise SMA)
|
||||||
|
TSDB_FILE_MAX, //
|
||||||
|
TSDB_FILE_META, // meta
|
||||||
|
TSDB_FILE_TSMA, // v2t100.${sma_index_name}, Time-range-wise SMA
|
||||||
|
TSDB_FILE_RSMA, // v2r100.${sma_index_name}, Time-range-wise Rollup SMA
|
||||||
|
} E_TSDB_FILE_T;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint32_t magic;
|
||||||
|
uint32_t fver;
|
||||||
|
uint32_t len;
|
||||||
|
uint32_t totalBlocks;
|
||||||
|
uint32_t totalSubBlocks;
|
||||||
|
uint32_t offset;
|
||||||
|
uint64_t size;
|
||||||
|
uint64_t tombSize;
|
||||||
|
} SDFInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SDFInfo info;
|
||||||
|
STfsFile f;
|
||||||
|
TdFilePtr pFile;
|
||||||
|
uint8_t state;
|
||||||
|
} SDFile;
|
||||||
|
|
||||||
|
struct SSmaEnvs {
|
||||||
|
int16_t nTSma;
|
||||||
|
int16_t nRSma;
|
||||||
|
SSmaEnv *pTSmaEnv;
|
||||||
|
SSmaEnv *pRSmaEnv;
|
||||||
|
};
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int fid;
|
||||||
|
int8_t state; // -128~127
|
||||||
|
uint8_t ver; // 0~255, DFileSet version
|
||||||
|
uint16_t reserve;
|
||||||
|
SDFile files[TSDB_FILE_MAX];
|
||||||
|
} SDFileSet;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int minFid;
|
||||||
|
int midFid;
|
||||||
|
int maxFid;
|
||||||
|
TSKEY minKey;
|
||||||
|
} SRtn;
|
||||||
|
|
||||||
|
typedef struct STbData {
|
||||||
|
tb_uid_t uid;
|
||||||
|
TSKEY keyMin;
|
||||||
|
TSKEY keyMax;
|
||||||
|
int64_t nrows;
|
||||||
|
SSkipList *pData;
|
||||||
|
} STbData;
|
||||||
|
|
||||||
|
typedef struct STsdbMemTable {
|
||||||
|
T_REF_DECLARE()
|
||||||
|
SRWLatch latch;
|
||||||
|
TSKEY keyMin;
|
||||||
|
TSKEY keyMax;
|
||||||
|
uint64_t nRow;
|
||||||
|
SMemAllocator *pMA;
|
||||||
|
// Container
|
||||||
|
SSkipList *pSlIdx; // SSkiplist<STbData>
|
||||||
|
SHashObj * pHashIdx;
|
||||||
|
} STsdbMemTable;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint32_t version; // Commit version from 0 to increase
|
||||||
|
int64_t totalPoints; // total points
|
||||||
|
int64_t totalStorage; // Uncompressed total storage
|
||||||
|
} STsdbFSMeta;
|
||||||
|
|
||||||
|
// ==================
|
||||||
|
typedef struct {
|
||||||
|
STsdbFSMeta meta; // FS meta
|
||||||
|
SArray * df; // data file array
|
||||||
|
SArray * sf; // sma data file array v2f1900.index_name_1
|
||||||
|
} SFSStatus;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
TdThreadRwlock lock;
|
||||||
|
|
||||||
|
SFSStatus *cstatus; // current status
|
||||||
|
SHashObj * metaCache; // meta cache
|
||||||
|
SHashObj * metaCacheComp; // meta cache for compact
|
||||||
|
bool intxn;
|
||||||
|
SFSStatus *nstatus; // new status
|
||||||
|
} STsdbFS;
|
||||||
|
|
||||||
|
struct STsdb {
|
||||||
|
int32_t vgId;
|
||||||
|
bool repoLocked;
|
||||||
|
TdThreadMutex mutex;
|
||||||
|
char * path;
|
||||||
|
STsdbCfg config;
|
||||||
|
STsdbMemTable * mem;
|
||||||
|
STsdbMemTable * imem;
|
||||||
|
SRtn rtn;
|
||||||
|
SMemAllocatorFactory *pmaf;
|
||||||
|
STsdbFS * fs;
|
||||||
|
SMeta * pMeta;
|
||||||
|
STfs * pTfs;
|
||||||
|
SSmaEnvs smaEnvs;
|
||||||
|
};
|
||||||
|
|
||||||
|
#define REPO_ID(r) ((r)->vgId)
|
||||||
|
#define REPO_CFG(r) (&(r)->config)
|
||||||
|
#define REPO_FS(r) ((r)->fs)
|
||||||
|
#define REPO_META(r) ((r)->pMeta)
|
||||||
|
#define REPO_TFS(r) ((r)->pTfs)
|
||||||
|
#define IS_REPO_LOCKED(r) ((r)->repoLocked)
|
||||||
|
#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma)
|
||||||
|
#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma)
|
||||||
|
#define REPO_TSMA_ENV(r) ((r)->smaEnvs.pTSmaEnv)
|
||||||
|
#define REPO_RSMA_ENV(r) ((r)->smaEnvs.pRSmaEnv)
|
||||||
|
|
||||||
|
int tsdbLockRepo(STsdb *pTsdb);
|
||||||
|
int tsdbUnlockRepo(STsdb *pTsdb);
|
||||||
|
|
||||||
|
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) {
|
||||||
|
return pTable->pSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdbLog
|
||||||
|
extern int32_t tsdbDebugFlag;
|
||||||
|
|
||||||
|
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
|
// tsdbMemTable.h
|
||||||
|
typedef struct {
|
||||||
|
int rowsInserted;
|
||||||
|
int rowsUpdated;
|
||||||
|
int rowsDeleteSucceed;
|
||||||
|
int rowsDeleteFailed;
|
||||||
|
int nOperations;
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
} SMergeInfo;
|
||||||
|
|
||||||
|
static void * taosTMalloc(size_t size);
|
||||||
|
static void * taosTCalloc(size_t nmemb, size_t size);
|
||||||
|
static void * taosTRealloc(void *ptr, size_t size);
|
||||||
|
static void * taosTZfree(void *ptr);
|
||||||
|
static size_t taosTSizeof(void *ptr);
|
||||||
|
static void taosTMemset(void *ptr, int c);
|
||||||
|
|
||||||
|
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
|
||||||
|
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
|
||||||
|
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp);
|
||||||
|
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
||||||
|
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
||||||
|
|
||||||
|
static FORCE_INLINE STSRow *tsdbNextIterRow(SSkipListIterator *pIter) {
|
||||||
|
if (pIter == NULL) return NULL;
|
||||||
|
|
||||||
|
SSkipListNode *node = tSkipListIterGet(pIter);
|
||||||
|
if (node == NULL) return NULL;
|
||||||
|
|
||||||
|
return (STSRow *)SL_GET_NODE_DATA(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
|
||||||
|
STSRow *row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
|
||||||
|
|
||||||
|
return TD_ROW_KEY(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdbOptions
|
||||||
|
extern const STsdbCfg defautlTsdbOptions;
|
||||||
|
|
||||||
|
int tsdbValidateOptions(const STsdbCfg *);
|
||||||
|
void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc);
|
||||||
|
|
||||||
|
// tsdbReadImpl
|
||||||
|
typedef struct SReadH SReadH;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint32_t len;
|
||||||
|
uint32_t offset;
|
||||||
|
uint32_t hasLast : 2;
|
||||||
|
uint32_t numOfBlocks : 30;
|
||||||
|
uint64_t uid;
|
||||||
|
TSKEY maxKey;
|
||||||
|
} SBlockIdx;
|
||||||
|
|
||||||
|
#ifdef TD_REFACTOR_3
|
||||||
|
typedef struct {
|
||||||
|
int64_t last : 1;
|
||||||
|
int64_t offset : 63;
|
||||||
|
int32_t algorithm : 8;
|
||||||
|
int32_t numOfRows : 24;
|
||||||
|
int32_t len;
|
||||||
|
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
||||||
|
int16_t numOfSubBlocks;
|
||||||
|
int16_t numOfCols; // not including timestamp column
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
} SBlock;
|
||||||
|
|
||||||
|
#else
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_SBLK_VER_0 = 0,
|
||||||
|
TSDB_SBLK_VER_MAX,
|
||||||
|
} ESBlockVer;
|
||||||
|
|
||||||
|
#define SBlockVerLatest TSDB_SBLK_VER_0
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint8_t last : 1;
|
||||||
|
uint8_t blkVer : 7;
|
||||||
|
uint8_t numOfSubBlocks;
|
||||||
|
col_id_t numOfCols; // not including timestamp column
|
||||||
|
uint32_t len; // data block length
|
||||||
|
uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
||||||
|
uint32_t algorithm : 4;
|
||||||
|
uint32_t reserve : 8;
|
||||||
|
col_id_t numOfBSma;
|
||||||
|
uint16_t numOfRows;
|
||||||
|
int64_t offset;
|
||||||
|
uint64_t aggrStat : 1;
|
||||||
|
uint64_t aggrOffset : 63;
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
} SBlockV0;
|
||||||
|
|
||||||
|
#define SBlock SBlockV0 // latest SBlock definition
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t delimiter; // For recovery usage
|
||||||
|
int32_t tid;
|
||||||
|
uint64_t uid;
|
||||||
|
SBlock blocks[];
|
||||||
|
} SBlockInfo;
|
||||||
|
|
||||||
|
#ifdef TD_REFACTOR_3
|
||||||
|
typedef struct {
|
||||||
|
int16_t colId;
|
||||||
|
uint16_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
|
||||||
|
uint16_t reserve : 15;
|
||||||
|
int32_t len;
|
||||||
|
uint32_t type : 8;
|
||||||
|
uint32_t offset : 24;
|
||||||
|
int64_t sum;
|
||||||
|
int64_t max;
|
||||||
|
int64_t min;
|
||||||
|
int16_t maxIndex;
|
||||||
|
int16_t minIndex;
|
||||||
|
int16_t numOfNull;
|
||||||
|
uint8_t offsetH;
|
||||||
|
char padding[1];
|
||||||
|
} SBlockCol;
|
||||||
|
#else
|
||||||
|
typedef struct {
|
||||||
|
int16_t colId;
|
||||||
|
uint16_t type : 6;
|
||||||
|
uint16_t blen : 10; // bitmap length(TODO: full UT for the bitmap compress of various data input)
|
||||||
|
uint32_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
|
||||||
|
uint32_t len : 31; // data length + bitmap length
|
||||||
|
uint32_t offset;
|
||||||
|
} SBlockColV0;
|
||||||
|
|
||||||
|
#define SBlockCol SBlockColV0 // latest SBlockCol definition
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int16_t colId;
|
||||||
|
int16_t maxIndex;
|
||||||
|
int16_t minIndex;
|
||||||
|
int16_t numOfNull;
|
||||||
|
int64_t sum;
|
||||||
|
int64_t max;
|
||||||
|
int64_t min;
|
||||||
|
} SAggrBlkColV0;
|
||||||
|
|
||||||
|
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
|
// Code here just for back-ward compatibility
|
||||||
|
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
|
||||||
|
#ifdef TD_REFACTOR_3
|
||||||
|
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
|
||||||
|
pBlockCol->offsetH = (uint8_t)(offset >> 24);
|
||||||
|
#else
|
||||||
|
pBlockCol->offset = offset;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
|
||||||
|
#ifdef TD_REFACTOR_3
|
||||||
|
uint32_t offset1 = pBlockCol->offset;
|
||||||
|
uint32_t offset2 = pBlockCol->offsetH;
|
||||||
|
return (offset1 | (offset2 << 24));
|
||||||
|
#else
|
||||||
|
return pBlockCol->offset;
|
||||||
|
#endif
|
||||||
|
}
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t delimiter; // For recovery usage
|
||||||
|
int32_t numOfCols; // For recovery usage
|
||||||
|
uint64_t uid; // For recovery usage
|
||||||
|
SBlockCol cols[];
|
||||||
|
} SBlockData;
|
||||||
|
|
||||||
|
typedef void SAggrBlkData; // SBlockCol cols[];
|
||||||
|
|
||||||
|
struct SReadH {
|
||||||
|
STsdb *pRepo;
|
||||||
|
SDFileSet rSet; // FSET to read
|
||||||
|
SArray *aBlkIdx; // SBlockIdx array
|
||||||
|
STable *pTable; // table to read
|
||||||
|
SBlockIdx *pBlkIdx; // current reading table SBlockIdx
|
||||||
|
int cidx;
|
||||||
|
SBlockInfo *pBlkInfo;
|
||||||
|
SBlockData *pBlkData; // Block info
|
||||||
|
SAggrBlkData *pAggrBlkData; // Aggregate Block info
|
||||||
|
SDataCols *pDCols[2];
|
||||||
|
void *pBuf; // buffer
|
||||||
|
void *pCBuf; // compression buffer
|
||||||
|
void *pExBuf; // extra buffer
|
||||||
|
};
|
||||||
|
|
||||||
|
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
||||||
|
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
|
||||||
|
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
|
||||||
|
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
|
||||||
|
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
|
||||||
|
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
|
||||||
|
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
|
||||||
|
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
|
||||||
|
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
|
||||||
|
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
|
||||||
|
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
|
||||||
|
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
|
||||||
|
|
||||||
|
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
|
||||||
|
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
|
||||||
|
|
||||||
|
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
|
||||||
|
switch (blkVer) {
|
||||||
|
case TSDB_SBLK_VER_0:
|
||||||
|
default:
|
||||||
|
return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
|
||||||
|
|
||||||
|
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
|
||||||
|
switch (blkVer) {
|
||||||
|
case TSDB_SBLK_VER_0:
|
||||||
|
default:
|
||||||
|
return TSDB_BLOCK_AGGR_SIZE(nCols, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
|
||||||
|
void tsdbDestroyReadH(SReadH *pReadh);
|
||||||
|
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
|
||||||
|
void tsdbCloseAndUnsetFSet(SReadH *pReadh);
|
||||||
|
int tsdbLoadBlockIdx(SReadH *pReadh);
|
||||||
|
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
|
||||||
|
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
|
||||||
|
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
|
||||||
|
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
|
||||||
|
int numOfColsIds);
|
||||||
|
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
||||||
|
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
||||||
|
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
||||||
|
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
||||||
|
void *pBuf = *ppBuf;
|
||||||
|
size_t tsize = taosTSizeof(pBuf);
|
||||||
|
|
||||||
|
if (tsize < size) {
|
||||||
|
if (tsize == 0) tsize = 1024;
|
||||||
|
|
||||||
|
while (tsize < size) {
|
||||||
|
tsize *= 2;
|
||||||
|
}
|
||||||
|
|
||||||
|
*ppBuf = taosTRealloc(pBuf, tsize);
|
||||||
|
if (*ppBuf == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdbMemory
|
||||||
|
static FORCE_INLINE void *taosTMalloc(size_t size) {
|
||||||
|
if (size <= 0) return NULL;
|
||||||
|
|
||||||
|
void *ret = taosMemoryMalloc(size + sizeof(size_t));
|
||||||
|
if (ret == NULL) return NULL;
|
||||||
|
|
||||||
|
*(size_t *)ret = size;
|
||||||
|
|
||||||
|
return (void *)((char *)ret + sizeof(size_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
|
||||||
|
size_t tsize = nmemb * size;
|
||||||
|
void * ret = taosTMalloc(tsize);
|
||||||
|
if (ret == NULL) return NULL;
|
||||||
|
|
||||||
|
taosTMemset(ret, 0);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
|
||||||
|
|
||||||
|
static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
|
||||||
|
|
||||||
|
static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
|
||||||
|
if (ptr == NULL) return taosTMalloc(size);
|
||||||
|
|
||||||
|
if (size <= taosTSizeof(ptr)) return ptr;
|
||||||
|
|
||||||
|
void * tptr = (void *)((char *)ptr - sizeof(size_t));
|
||||||
|
size_t tsize = size + sizeof(size_t);
|
||||||
|
void* tptr1 = taosMemoryRealloc(tptr, tsize);
|
||||||
|
if (tptr1 == NULL) return NULL;
|
||||||
|
tptr = tptr1;
|
||||||
|
|
||||||
|
*(size_t *)tptr = size;
|
||||||
|
|
||||||
|
return (void *)((char *)tptr + sizeof(size_t));
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void* taosTZfree(void* ptr) {
|
||||||
|
if (ptr) {
|
||||||
|
taosMemoryFree((void*)((char*)ptr - sizeof(size_t)));
|
||||||
|
}
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdbCommit
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t uid;
|
||||||
|
int64_t offset;
|
||||||
|
int64_t size;
|
||||||
|
} SKVRecord;
|
||||||
|
|
||||||
|
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
|
||||||
|
|
||||||
|
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
|
||||||
|
if (key < 0) {
|
||||||
|
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
|
||||||
|
} else {
|
||||||
|
return (int)((key / tsTickPerDay[precision] / days));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
||||||
|
if (fid >= pRtn->maxFid) {
|
||||||
|
return 0;
|
||||||
|
} else if (fid >= pRtn->midFid) {
|
||||||
|
return 1;
|
||||||
|
} else if (fid >= pRtn->minFid) {
|
||||||
|
return 2;
|
||||||
|
} else {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdbDBDef
|
||||||
|
// typedef struct SDBFile SDBFile;
|
||||||
|
// typedef DB_ENV* TDBEnv;
|
||||||
|
|
||||||
|
// struct SDBFile {
|
||||||
|
// int32_t fid;
|
||||||
|
// DB* pDB;
|
||||||
|
// char* path;
|
||||||
|
// };
|
||||||
|
|
||||||
|
// int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
|
||||||
|
// void tsdbCloseDBF(SDBFile* pDBF);
|
||||||
|
// int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path);
|
||||||
|
// void tsdbCloseBDBEnv(DB_ENV* pEnv);
|
||||||
|
// int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize);
|
||||||
|
// void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize);
|
||||||
|
|
||||||
|
// tsdbFile
|
||||||
|
#define TSDB_FILE_HEAD_SIZE 512
|
||||||
|
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||||
|
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
||||||
|
#define TSDB_IVLD_FID INT_MIN
|
||||||
|
#define TSDB_FILE_STATE_OK 0
|
||||||
|
#define TSDB_FILE_STATE_BAD 1
|
||||||
|
|
||||||
|
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
||||||
|
#define TSDB_FILE_F(tf) (&((tf)->f))
|
||||||
|
#define TSDB_FILE_PFILE(tf) ((tf)->pFile)
|
||||||
|
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
||||||
|
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL)
|
||||||
|
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
|
||||||
|
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL)
|
||||||
|
#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level)
|
||||||
|
#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id)
|
||||||
|
#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did)
|
||||||
|
#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname)
|
||||||
|
#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
||||||
|
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf))
|
||||||
|
#define TSDB_FILE_STATE(tf) ((tf)->state)
|
||||||
|
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
|
||||||
|
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
||||||
|
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
||||||
|
|
||||||
|
typedef int32_t TSDB_FILE_T;
|
||||||
|
typedef enum {
|
||||||
|
TSDB_FS_VER_0 = 0,
|
||||||
|
TSDB_FS_VER_MAX,
|
||||||
|
} ETsdbFsVer;
|
||||||
|
|
||||||
|
#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
|
||||||
|
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
|
||||||
|
|
||||||
|
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
|
||||||
|
switch (fType) {
|
||||||
|
case TSDB_FILE_HEAD: // .head
|
||||||
|
case TSDB_FILE_DATA: // .data
|
||||||
|
case TSDB_FILE_LAST: // .last
|
||||||
|
case TSDB_FILE_SMAD: // .smad(Block-wise SMA)
|
||||||
|
case TSDB_FILE_SMAL: // .smal(Block-wise SMA)
|
||||||
|
default:
|
||||||
|
return TSDB_LATEST_FVER;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
||||||
|
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
|
||||||
|
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
||||||
|
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
|
||||||
|
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType);
|
||||||
|
int tsdbUpdateDFileHeader(SDFile* pDFile);
|
||||||
|
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
|
||||||
|
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; }
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
|
||||||
|
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
|
||||||
|
if (pDFile->pFile == NULL) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) {
|
||||||
|
if (TSDB_FILE_OPENED(pDFile)) {
|
||||||
|
taosCloseFile(&pDFile->pFile);
|
||||||
|
TSDB_FILE_SET_CLOSED(pDFile);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) {
|
||||||
|
// ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
|
||||||
|
if (loffset < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return loffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||||
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
|
||||||
|
if (nwrite < nbyte) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nwrite;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) {
|
||||||
|
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM));
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) {
|
||||||
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t toffset;
|
||||||
|
|
||||||
|
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pDFile->info.size == toffset);
|
||||||
|
|
||||||
|
if (offset) {
|
||||||
|
*offset = toffset;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
pDFile->info.size += nbyte;
|
||||||
|
|
||||||
|
return (int)nbyte;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
|
||||||
|
|
||||||
|
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
||||||
|
ASSERT(TSDB_FILE_OPENED(pDFile));
|
||||||
|
|
||||||
|
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
|
||||||
|
if (nread < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
return nread;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
|
||||||
|
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(errno);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
|
||||||
|
tsdbSetDFileInfo(pDest, TSDB_FILE_INFO(pSrc));
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// =============== SDFileSet
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int fid;
|
||||||
|
int8_t state;
|
||||||
|
uint8_t ver;
|
||||||
|
uint16_t reserve;
|
||||||
|
#if 0
|
||||||
|
SDFInfo info;
|
||||||
|
#endif
|
||||||
|
STfsFile f;
|
||||||
|
TdFilePtr pFile;
|
||||||
|
|
||||||
|
} SSFile; // files split by days with fid
|
||||||
|
|
||||||
|
#define TSDB_LATEST_FSET_VER 0
|
||||||
|
|
||||||
|
#define TSDB_FSET_FID(s) ((s)->fid)
|
||||||
|
#define TSDB_FSET_STATE(s) ((s)->state)
|
||||||
|
#define TSDB_FSET_VER(s) ((s)->ver)
|
||||||
|
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
|
||||||
|
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
|
||||||
|
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
|
||||||
|
#define TSDB_FSET_SET_CLOSED(s) \
|
||||||
|
do { \
|
||||||
|
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
|
||||||
|
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
|
||||||
|
} \
|
||||||
|
} while (0);
|
||||||
|
#define TSDB_FSET_FSYNC(s) \
|
||||||
|
do { \
|
||||||
|
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
|
||||||
|
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
|
||||||
|
} \
|
||||||
|
} while (0);
|
||||||
|
|
||||||
|
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet* pSet, SDiskID did, int fid, uint32_t ver);
|
||||||
|
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
||||||
|
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
|
||||||
|
void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet);
|
||||||
|
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
|
||||||
|
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
|
||||||
|
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
|
||||||
|
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet* pSet, bool updateHeader);
|
||||||
|
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
|
||||||
|
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
|
||||||
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
|
||||||
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
|
||||||
|
tsdbCloseDFileSet(pSet);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) {
|
||||||
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
|
||||||
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
|
||||||
|
tsdbRemoveDFileSet(pDest);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) {
|
||||||
|
*minKey = fid * days * tsTickPerDay[precision];
|
||||||
|
*maxKey = *minKey + days * tsTickPerDay[precision] - 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
|
||||||
|
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
||||||
|
if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdbFS
|
||||||
|
// ================== TSDB global config
|
||||||
|
extern bool tsdbForceKeepFile;
|
||||||
|
|
||||||
|
// ================== CURRENT file header info
|
||||||
|
typedef struct {
|
||||||
|
uint32_t version; // Current file system version (relating to code)
|
||||||
|
uint32_t len; // Encode content length (including checksum)
|
||||||
|
} SFSHeader;
|
||||||
|
|
||||||
|
// ================== TSDB File System Meta
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @brief Directory structure of .tsma data files.
|
||||||
|
*
|
||||||
|
* /vnode2/tsdb $ tree tsma/
|
||||||
|
* tsma/
|
||||||
|
* ├── v2f100.index_name_1
|
||||||
|
* ├── v2f101.index_name_1
|
||||||
|
* ├── v2f102.index_name_1
|
||||||
|
* ├── v2f1900.index_name_3
|
||||||
|
* ├── v2f1901.index_name_3
|
||||||
|
* ├── v2f1902.index_name_3
|
||||||
|
* ├── v2f200.index_name_2
|
||||||
|
* ├── v2f201.index_name_2
|
||||||
|
* └── v2f202.index_name_2
|
||||||
|
*
|
||||||
|
* 0 directories, 9 files
|
||||||
|
*/
|
||||||
|
|
||||||
|
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
|
||||||
|
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
|
||||||
|
#define FS_IN_TXN(pfs) (pfs)->intxn
|
||||||
|
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
|
||||||
|
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int direction;
|
||||||
|
uint64_t version; // current FS version
|
||||||
|
STsdbFS * pfs;
|
||||||
|
int index; // used to position next fset when version the same
|
||||||
|
int fid; // used to seek when version is changed
|
||||||
|
SDFileSet *pSet;
|
||||||
|
} SFSIter;
|
||||||
|
|
||||||
|
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
||||||
|
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
||||||
|
|
||||||
|
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg);
|
||||||
|
void * tsdbFreeFS(STsdbFS *pfs);
|
||||||
|
int tsdbOpenFS(STsdb *pRepo);
|
||||||
|
void tsdbCloseFS(STsdb *pRepo);
|
||||||
|
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
|
||||||
|
int tsdbEndFSTxn(STsdb *pRepo);
|
||||||
|
int tsdbEndFSTxnWithError(STsdbFS *pfs);
|
||||||
|
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
|
||||||
|
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
|
||||||
|
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
|
||||||
|
|
||||||
|
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
|
||||||
|
void tsdbFSIterSeek(SFSIter *pIter, int fid);
|
||||||
|
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
|
||||||
|
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
|
||||||
|
int code = taosThreadRwlockRdlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
|
||||||
|
int code = taosThreadRwlockWrlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
|
||||||
|
int code = taosThreadRwlockUnlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
// tsdbSma
|
||||||
|
// #define TSDB_SMA_TEST // remove after test finished
|
||||||
|
|
||||||
|
|
||||||
|
// struct SSmaEnv {
|
||||||
|
// TdThreadRwlock lock;
|
||||||
|
// SDiskID did;
|
||||||
|
// TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
|
||||||
|
// char *path; // relative path
|
||||||
|
// SSmaStat *pStat;
|
||||||
|
// };
|
||||||
|
|
||||||
|
// #define SMA_ENV_LOCK(env) ((env)->lock)
|
||||||
|
// #define SMA_ENV_DID(env) ((env)->did)
|
||||||
|
// #define SMA_ENV_ENV(env) ((env)->dbEnv)
|
||||||
|
// #define SMA_ENV_PATH(env) ((env)->path)
|
||||||
|
// #define SMA_ENV_STAT(env) ((env)->pStat)
|
||||||
|
// #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
|
||||||
|
|
||||||
|
|
||||||
|
// void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
|
||||||
|
// void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
|
||||||
|
// #if 0
|
||||||
|
// int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
|
||||||
|
// int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
|
||||||
|
// #endif
|
||||||
|
|
||||||
|
// // internal func
|
||||||
|
// static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) {
|
||||||
|
// int32_t len = 0;
|
||||||
|
// len += taosEncodeFixedI64(pData, tsKey);
|
||||||
|
// len += taosEncodeFixedI64(pData, groupId);
|
||||||
|
// return len;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// static FORCE_INLINE int32_t tsdbRLockSma(SSmaEnv *pEnv) {
|
||||||
|
// int code = taosThreadRwlockRdlock(&(pEnv->lock));
|
||||||
|
// if (code != 0) {
|
||||||
|
// terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// return 0;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// static FORCE_INLINE int32_t tsdbWLockSma(SSmaEnv *pEnv) {
|
||||||
|
// int code = taosThreadRwlockWrlock(&(pEnv->lock));
|
||||||
|
// if (code != 0) {
|
||||||
|
// terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// return 0;
|
||||||
|
// }
|
||||||
|
|
||||||
|
// static FORCE_INLINE int32_t tsdbUnLockSma(SSmaEnv *pEnv) {
|
||||||
|
// int code = taosThreadRwlockUnlock(&(pEnv->lock));
|
||||||
|
// if (code != 0) {
|
||||||
|
// terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
// return -1;
|
||||||
|
// }
|
||||||
|
// return 0;
|
||||||
|
// }
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
||||||
#endif /*_TD_VNODE_TSDB_H_*/
|
#endif /*_TD_VNODE_TSDB_H_*/
|
|
@ -1,79 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_COMMIT_H_
|
|
||||||
#define _TD_TSDB_COMMIT_H_
|
|
||||||
|
|
||||||
#include "vnode.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int minFid;
|
|
||||||
int midFid;
|
|
||||||
int maxFid;
|
|
||||||
TSKEY minKey;
|
|
||||||
} SRtn;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint64_t uid;
|
|
||||||
int64_t offset;
|
|
||||||
int64_t size;
|
|
||||||
} SKVRecord;
|
|
||||||
|
|
||||||
void tsdbGetRtnSnap(STsdb *pRepo, SRtn *pRtn);
|
|
||||||
|
|
||||||
static FORCE_INLINE int TSDB_KEY_FID(TSKEY key, int32_t days, int8_t precision) {
|
|
||||||
if (key < 0) {
|
|
||||||
return (int)((key + 1) / tsTickPerDay[precision] / days - 1);
|
|
||||||
} else {
|
|
||||||
return (int)((key / tsTickPerDay[precision] / days));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
|
||||||
if (fid >= pRtn->maxFid) {
|
|
||||||
return 0;
|
|
||||||
} else if (fid >= pRtn->midFid) {
|
|
||||||
return 1;
|
|
||||||
} else if (fid >= pRtn->minFid) {
|
|
||||||
return 2;
|
|
||||||
} else {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
|
|
||||||
|
|
||||||
int tsdbEncodeKVRecord(void **buf, SKVRecord *pRecord);
|
|
||||||
void *tsdbDecodeKVRecord(void *buf, SKVRecord *pRecord);
|
|
||||||
void *tsdbCommitData(STsdbRepo *pRepo);
|
|
||||||
int tsdbApplyRtnOnFSet(STsdbRepo *pRepo, SDFileSet *pSet, SRtn *pRtn);
|
|
||||||
int tsdbWriteBlockInfoImpl(SDFile *pHeadf, STable *pTable, SArray *pSupA, SArray *pSubA, void **ppBuf, SBlockIdx *pIdx);
|
|
||||||
int tsdbWriteBlockIdx(SDFile *pHeadf, SArray *pIdxA, void **ppBuf);
|
|
||||||
int tsdbWriteBlockImpl(STsdbRepo *pRepo, STable *pTable, SDFile *pDFile, SDataCols *pDataCols, SBlock *pBlock,
|
|
||||||
bool isLast, bool isSuper, void **ppBuf, void **ppCBuf);
|
|
||||||
int tsdbApplyRtn(STsdbRepo *pRepo);
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* _TD_TSDB_COMMIT_H_ */
|
|
|
@ -1,32 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
#if 0
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_COMPACT_H_
|
|
||||||
#define _TD_TSDB_COMPACT_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
void *tsdbCompactImpl(STsdbRepo *pRepo);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* _TD_TSDB_COMPACT_H_ */
|
|
||||||
|
|
||||||
#endif
|
|
|
@ -1,45 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_DB_DEF_H_
|
|
||||||
#define _TD_TSDB_DB_DEF_H_
|
|
||||||
|
|
||||||
#include "db.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct SDBFile SDBFile;
|
|
||||||
typedef DB_ENV* TDBEnv;
|
|
||||||
|
|
||||||
struct SDBFile {
|
|
||||||
int32_t fid;
|
|
||||||
DB* pDB;
|
|
||||||
char* path;
|
|
||||||
};
|
|
||||||
|
|
||||||
int32_t tsdbOpenDBF(TDBEnv pEnv, SDBFile* pDBF);
|
|
||||||
void tsdbCloseDBF(SDBFile* pDBF);
|
|
||||||
int32_t tsdbOpenBDBEnv(DB_ENV** ppEnv, const char* path);
|
|
||||||
void tsdbCloseBDBEnv(DB_ENV* pEnv);
|
|
||||||
int32_t tsdbSaveSmaToDB(SDBFile* pDBF, void* key, uint32_t keySize, void* data, uint32_t dataSize);
|
|
||||||
void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_TSDB_DB_DEF_H_*/
|
|
|
@ -1,82 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_DEF_H_
|
|
||||||
#define _TD_TSDB_DEF_H_
|
|
||||||
|
|
||||||
#include "tsdbDBDef.h"
|
|
||||||
#include "tmallocator.h"
|
|
||||||
#include "tcompression.h"
|
|
||||||
#include "tglobal.h"
|
|
||||||
#include "thash.h"
|
|
||||||
#include "tlist.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
#include "tskiplist.h"
|
|
||||||
#include "ttime.h"
|
|
||||||
|
|
||||||
#include "tsdbCommit.h"
|
|
||||||
#include "tsdbFS.h"
|
|
||||||
#include "tsdbFile.h"
|
|
||||||
#include "tsdbLog.h"
|
|
||||||
#include "tsdbMemTable.h"
|
|
||||||
#include "tsdbMemory.h"
|
|
||||||
#include "tsdbOptions.h"
|
|
||||||
#include "tsdbReadImpl.h"
|
|
||||||
#include "tsdbSma.h"
|
|
||||||
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
struct STsdb {
|
|
||||||
int32_t vgId;
|
|
||||||
bool repoLocked;
|
|
||||||
TdThreadMutex mutex;
|
|
||||||
char * path;
|
|
||||||
STsdbCfg config;
|
|
||||||
STsdbMemTable * mem;
|
|
||||||
STsdbMemTable * imem;
|
|
||||||
SRtn rtn;
|
|
||||||
SMemAllocatorFactory *pmaf;
|
|
||||||
STsdbFS * fs;
|
|
||||||
SMeta * pMeta;
|
|
||||||
STfs * pTfs;
|
|
||||||
SSmaEnvs smaEnvs;
|
|
||||||
};
|
|
||||||
|
|
||||||
#define REPO_ID(r) ((r)->vgId)
|
|
||||||
#define REPO_CFG(r) (&(r)->config)
|
|
||||||
#define REPO_FS(r) ((r)->fs)
|
|
||||||
#define REPO_META(r) ((r)->pMeta)
|
|
||||||
#define REPO_TFS(r) ((r)->pTfs)
|
|
||||||
#define IS_REPO_LOCKED(r) ((r)->repoLocked)
|
|
||||||
#define REPO_TSMA_NUM(r) ((r)->smaEnvs.nTSma)
|
|
||||||
#define REPO_RSMA_NUM(r) ((r)->smaEnvs.nRSma)
|
|
||||||
#define REPO_TSMA_ENV(r) ((r)->smaEnvs.pTSmaEnv)
|
|
||||||
#define REPO_RSMA_ENV(r) ((r)->smaEnvs.pRSmaEnv)
|
|
||||||
|
|
||||||
int tsdbLockRepo(STsdb *pTsdb);
|
|
||||||
int tsdbUnlockRepo(STsdb *pTsdb);
|
|
||||||
|
|
||||||
static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, bool copy, int32_t version) {
|
|
||||||
return pTable->pSchema;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_TSDB_DEF_H_*/
|
|
|
@ -1,141 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_FS_H_
|
|
||||||
#define _TD_TSDB_FS_H_
|
|
||||||
|
|
||||||
#include "tsdbFile.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// ================== TSDB global config
|
|
||||||
extern bool tsdbForceKeepFile;
|
|
||||||
|
|
||||||
// ================== CURRENT file header info
|
|
||||||
typedef struct {
|
|
||||||
uint32_t version; // Current file system version (relating to code)
|
|
||||||
uint32_t len; // Encode content length (including checksum)
|
|
||||||
} SFSHeader;
|
|
||||||
|
|
||||||
// ================== TSDB File System Meta
|
|
||||||
typedef struct {
|
|
||||||
uint32_t version; // Commit version from 0 to increase
|
|
||||||
int64_t totalPoints; // total points
|
|
||||||
int64_t totalStorage; // Uncompressed total storage
|
|
||||||
} STsdbFSMeta;
|
|
||||||
|
|
||||||
// ==================
|
|
||||||
typedef struct {
|
|
||||||
STsdbFSMeta meta; // FS meta
|
|
||||||
SArray * df; // data file array
|
|
||||||
SArray * sf; // sma data file array v2f1900.index_name_1
|
|
||||||
} SFSStatus;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @brief Directory structure of .tsma data files.
|
|
||||||
*
|
|
||||||
* /vnode2/tsdb $ tree tsma/
|
|
||||||
* tsma/
|
|
||||||
* ├── v2f100.index_name_1
|
|
||||||
* ├── v2f101.index_name_1
|
|
||||||
* ├── v2f102.index_name_1
|
|
||||||
* ├── v2f1900.index_name_3
|
|
||||||
* ├── v2f1901.index_name_3
|
|
||||||
* ├── v2f1902.index_name_3
|
|
||||||
* ├── v2f200.index_name_2
|
|
||||||
* ├── v2f201.index_name_2
|
|
||||||
* └── v2f202.index_name_2
|
|
||||||
*
|
|
||||||
* 0 directories, 9 files
|
|
||||||
*/
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
TdThreadRwlock lock;
|
|
||||||
|
|
||||||
SFSStatus *cstatus; // current status
|
|
||||||
SHashObj * metaCache; // meta cache
|
|
||||||
SHashObj * metaCacheComp; // meta cache for compact
|
|
||||||
bool intxn;
|
|
||||||
SFSStatus *nstatus; // new status
|
|
||||||
} STsdbFS;
|
|
||||||
|
|
||||||
#define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus)
|
|
||||||
#define FS_NEW_STATUS(pfs) ((pfs)->nstatus)
|
|
||||||
#define FS_IN_TXN(pfs) (pfs)->intxn
|
|
||||||
#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version)
|
|
||||||
#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version)
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int direction;
|
|
||||||
uint64_t version; // current FS version
|
|
||||||
STsdbFS * pfs;
|
|
||||||
int index; // used to position next fset when version the same
|
|
||||||
int fid; // used to seek when version is changed
|
|
||||||
SDFileSet *pSet;
|
|
||||||
} SFSIter;
|
|
||||||
|
|
||||||
#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC
|
|
||||||
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
|
|
||||||
|
|
||||||
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg);
|
|
||||||
void * tsdbFreeFS(STsdbFS *pfs);
|
|
||||||
int tsdbOpenFS(STsdb *pRepo);
|
|
||||||
void tsdbCloseFS(STsdb *pRepo);
|
|
||||||
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
|
|
||||||
int tsdbEndFSTxn(STsdb *pRepo);
|
|
||||||
int tsdbEndFSTxnWithError(STsdbFS *pfs);
|
|
||||||
void tsdbUpdateFSTxnMeta(STsdbFS *pfs, STsdbFSMeta *pMeta);
|
|
||||||
// void tsdbUpdateMFile(STsdbFS *pfs, const SMFile *pMFile);
|
|
||||||
int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet);
|
|
||||||
|
|
||||||
void tsdbFSIterInit(SFSIter *pIter, STsdbFS *pfs, int direction);
|
|
||||||
void tsdbFSIterSeek(SFSIter *pIter, int fid);
|
|
||||||
SDFileSet *tsdbFSIterNext(SFSIter *pIter);
|
|
||||||
int tsdbLoadMetaCache(STsdb *pRepo, bool recoverMeta);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
|
|
||||||
int code = taosThreadRwlockRdlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
|
|
||||||
int code = taosThreadRwlockWrlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
|
|
||||||
int code = taosThreadRwlockUnlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* _TD_TSDB_FS_H_ */
|
|
|
@ -1,435 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TS_TSDB_FILE_H_
|
|
||||||
#define _TS_TSDB_FILE_H_
|
|
||||||
|
|
||||||
#include "tchecksum.h"
|
|
||||||
#include "tfs.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#define TSDB_FILE_HEAD_SIZE 512
|
|
||||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
|
||||||
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
|
||||||
#define TSDB_IVLD_FID INT_MIN
|
|
||||||
#define TSDB_FILE_STATE_OK 0
|
|
||||||
#define TSDB_FILE_STATE_BAD 1
|
|
||||||
|
|
||||||
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
|
||||||
#define TSDB_FILE_F(tf) (&((tf)->f))
|
|
||||||
#define TSDB_FILE_PFILE(tf) ((tf)->pFile)
|
|
||||||
#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
|
||||||
#define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL)
|
|
||||||
#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf))
|
|
||||||
#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL)
|
|
||||||
#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level)
|
|
||||||
#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id)
|
|
||||||
#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did)
|
|
||||||
#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname)
|
|
||||||
#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname)
|
|
||||||
#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf))
|
|
||||||
#define TSDB_FILE_STATE(tf) ((tf)->state)
|
|
||||||
#define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s))
|
|
||||||
#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK)
|
|
||||||
#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD)
|
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
TSDB_FILE_HEAD = 0, // .head
|
|
||||||
TSDB_FILE_DATA, // .data
|
|
||||||
TSDB_FILE_LAST, // .last
|
|
||||||
TSDB_FILE_SMAD, // .smad(Block-wise SMA)
|
|
||||||
TSDB_FILE_SMAL, // .smal(Block-wise SMA)
|
|
||||||
TSDB_FILE_MAX, //
|
|
||||||
TSDB_FILE_META, // meta
|
|
||||||
TSDB_FILE_TSMA, // v2t100.${sma_index_name}, Time-range-wise SMA
|
|
||||||
TSDB_FILE_RSMA, // v2r100.${sma_index_name}, Time-range-wise Rollup SMA
|
|
||||||
} E_TSDB_FILE_T;
|
|
||||||
|
|
||||||
typedef int32_t TSDB_FILE_T;
|
|
||||||
typedef enum {
|
|
||||||
TSDB_FS_VER_0 = 0,
|
|
||||||
TSDB_FS_VER_MAX,
|
|
||||||
} ETsdbFsVer;
|
|
||||||
|
|
||||||
#define TSDB_LATEST_FVER TSDB_FS_VER_0 // latest version for DFile
|
|
||||||
#define TSDB_LATEST_SFS_VER TSDB_FS_VER_0 // latest version for 'current' file
|
|
||||||
|
|
||||||
static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest version for DFile
|
|
||||||
switch (fType) {
|
|
||||||
case TSDB_FILE_HEAD: // .head
|
|
||||||
case TSDB_FILE_DATA: // .data
|
|
||||||
case TSDB_FILE_LAST: // .last
|
|
||||||
case TSDB_FILE_SMAD: // .smad(Block-wise SMA)
|
|
||||||
case TSDB_FILE_SMAL: // .smal(Block-wise SMA)
|
|
||||||
default:
|
|
||||||
return TSDB_LATEST_FVER;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#if 0
|
|
||||||
// =============== SMFile
|
|
||||||
typedef struct {
|
|
||||||
int64_t size;
|
|
||||||
int64_t tombSize;
|
|
||||||
int64_t nRecords;
|
|
||||||
int64_t nDels;
|
|
||||||
uint32_t magic;
|
|
||||||
} SMFInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMFInfo info;
|
|
||||||
STfsFile f;
|
|
||||||
int fd;
|
|
||||||
uint8_t state;
|
|
||||||
} SMFile;
|
|
||||||
|
|
||||||
void tsdbInitMFile(SMFile* pMFile, SDiskID did, int vid, uint32_t ver);
|
|
||||||
void tsdbInitMFileEx(SMFile* pMFile, const SMFile* pOMFile);
|
|
||||||
int tsdbEncodeSMFile(void** buf, SMFile* pMFile);
|
|
||||||
void* tsdbDecodeSMFile(void* buf, SMFile* pMFile);
|
|
||||||
int tsdbEncodeSMFileEx(void** buf, SMFile* pMFile);
|
|
||||||
void* tsdbDecodeSMFileEx(void* buf, SMFile* pMFile);
|
|
||||||
int tsdbApplyMFileChange(SMFile* from, SMFile* to);
|
|
||||||
int tsdbCreateMFile(SMFile* pMFile, bool updateHeader);
|
|
||||||
int tsdbUpdateMFileHeader(SMFile* pMFile);
|
|
||||||
int tsdbLoadMFileHeader(SMFile* pMFile, SMFInfo* pInfo);
|
|
||||||
int tsdbScanAndTryFixMFile(STsdb* pRepo);
|
|
||||||
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
|
|
||||||
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbSetMFileInfo(SMFile* pMFile, SMFInfo* pInfo) { pMFile->info = *pInfo; }
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbOpenMFile(SMFile* pMFile, int flags) {
|
|
||||||
ASSERT(TSDB_FILE_CLOSED(pMFile));
|
|
||||||
|
|
||||||
pMFile->fd = open(TSDB_FILE_FULL_NAME(pMFile), flags);
|
|
||||||
if (pMFile->fd < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbCloseMFile(SMFile* pMFile) {
|
|
||||||
if (TSDB_FILE_OPENED(pMFile)) {
|
|
||||||
close(pMFile->fd);
|
|
||||||
TSDB_FILE_SET_CLOSED(pMFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pMFile));
|
|
||||||
|
|
||||||
int64_t loffset = taosLSeekFile(TSDB_FILE_FD(pMFile), offset, whence);
|
|
||||||
if (loffset < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return loffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pMFile));
|
|
||||||
|
|
||||||
int64_t nwrite = taosWriteFile(pMFile->fd, buf, nbyte);
|
|
||||||
if (nwrite < nbyte) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return nwrite;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbUpdateMFileMagic(SMFile* pMFile, void* pCksum) {
|
|
||||||
pMFile->info.magic = taosCalcChecksum(pMFile->info.magic, (uint8_t*)(pCksum), sizeof(TSCKSUM));
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbAppendMFile(SMFile* pMFile, void* buf, int64_t nbyte, int64_t* offset) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pMFile));
|
|
||||||
|
|
||||||
int64_t toffset;
|
|
||||||
|
|
||||||
if ((toffset = tsdbSeekMFile(pMFile, 0, SEEK_END)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pMFile->info.size == toffset);
|
|
||||||
|
|
||||||
if (offset) {
|
|
||||||
*offset = toffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbWriteMFile(pMFile, buf, nbyte) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pMFile->info.size += nbyte;
|
|
||||||
|
|
||||||
return (int)nbyte;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRemoveMFile(SMFile* pMFile) { return tfsremove(TSDB_FILE_F(pMFile)); }
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbReadMFile(SMFile* pMFile, void* buf, int64_t nbyte) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pMFile));
|
|
||||||
|
|
||||||
int64_t nread = taosReadFile(pMFile->fd, buf, nbyte);
|
|
||||||
if (nread < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return nread;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// =============== SDFile
|
|
||||||
typedef struct {
|
|
||||||
uint32_t magic;
|
|
||||||
uint32_t fver;
|
|
||||||
uint32_t len;
|
|
||||||
uint32_t totalBlocks;
|
|
||||||
uint32_t totalSubBlocks;
|
|
||||||
uint32_t offset;
|
|
||||||
uint64_t size;
|
|
||||||
uint64_t tombSize;
|
|
||||||
} SDFInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SDFInfo info;
|
|
||||||
STfsFile f;
|
|
||||||
TdFilePtr pFile;
|
|
||||||
uint8_t state;
|
|
||||||
} SDFile;
|
|
||||||
|
|
||||||
void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
|
|
||||||
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
|
|
||||||
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
|
|
||||||
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
|
|
||||||
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType);
|
|
||||||
int tsdbUpdateDFileHeader(SDFile* pDFile);
|
|
||||||
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
|
|
||||||
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; }
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
|
|
||||||
ASSERT(!TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
|
|
||||||
if (pDFile->pFile == NULL) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) {
|
|
||||||
if (TSDB_FILE_OPENED(pDFile)) {
|
|
||||||
taosCloseFile(&pDFile->pFile);
|
|
||||||
TSDB_FILE_SET_CLOSED(pDFile);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) {
|
|
||||||
// ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
|
|
||||||
if (loffset < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return loffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
|
|
||||||
if (nwrite < nbyte) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return nwrite;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) {
|
|
||||||
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM));
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t toffset;
|
|
||||||
|
|
||||||
if ((toffset = tsdbSeekDFile(pDFile, 0, SEEK_END)) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pDFile->info.size == toffset);
|
|
||||||
|
|
||||||
if (offset) {
|
|
||||||
*offset = toffset;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (tsdbWriteDFile(pDFile, buf, nbyte) < 0) {
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
pDFile->info.size += nbyte;
|
|
||||||
|
|
||||||
return (int)nbyte;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
|
|
||||||
|
|
||||||
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) {
|
|
||||||
ASSERT(TSDB_FILE_OPENED(pDFile));
|
|
||||||
|
|
||||||
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
|
|
||||||
if (nread < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
return nread;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) {
|
|
||||||
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(errno);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
|
|
||||||
tsdbSetDFileInfo(pDest, TSDB_FILE_INFO(pSrc));
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// =============== SDFileSet
|
|
||||||
typedef struct {
|
|
||||||
int fid;
|
|
||||||
int8_t state; // -128~127
|
|
||||||
uint8_t ver; // 0~255, DFileSet version
|
|
||||||
uint16_t reserve;
|
|
||||||
SDFile files[TSDB_FILE_MAX];
|
|
||||||
} SDFileSet;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int fid;
|
|
||||||
int8_t state;
|
|
||||||
uint8_t ver;
|
|
||||||
uint16_t reserve;
|
|
||||||
#if 0
|
|
||||||
SDFInfo info;
|
|
||||||
#endif
|
|
||||||
STfsFile f;
|
|
||||||
TdFilePtr pFile;
|
|
||||||
|
|
||||||
} SSFile; // files split by days with fid
|
|
||||||
|
|
||||||
#define TSDB_LATEST_FSET_VER 0
|
|
||||||
|
|
||||||
#define TSDB_FSET_FID(s) ((s)->fid)
|
|
||||||
#define TSDB_FSET_STATE(s) ((s)->state)
|
|
||||||
#define TSDB_FSET_VER(s) ((s)->ver)
|
|
||||||
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
|
|
||||||
#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0))
|
|
||||||
#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0))
|
|
||||||
#define TSDB_FSET_SET_CLOSED(s) \
|
|
||||||
do { \
|
|
||||||
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
|
|
||||||
TSDB_FILE_SET_CLOSED(TSDB_DFILE_IN_SET(s, ftype)); \
|
|
||||||
} \
|
|
||||||
} while (0);
|
|
||||||
#define TSDB_FSET_FSYNC(s) \
|
|
||||||
do { \
|
|
||||||
for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \
|
|
||||||
TSDB_FILE_FSYNC(TSDB_DFILE_IN_SET(s, ftype)); \
|
|
||||||
} \
|
|
||||||
} while (0);
|
|
||||||
|
|
||||||
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet* pSet, SDiskID did, int fid, uint32_t ver);
|
|
||||||
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet);
|
|
||||||
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet);
|
|
||||||
void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet);
|
|
||||||
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet);
|
|
||||||
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet);
|
|
||||||
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to);
|
|
||||||
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet* pSet, bool updateHeader);
|
|
||||||
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
|
|
||||||
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet);
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) {
|
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
|
||||||
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
|
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
|
||||||
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
|
|
||||||
tsdbCloseDFileSet(pSet);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) {
|
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
|
||||||
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
|
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
|
||||||
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
|
|
||||||
tsdbRemoveDFileSet(pDest);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) {
|
|
||||||
*minKey = fid * days * tsTickPerDay[precision];
|
|
||||||
*maxKey = *minKey + days * tsTickPerDay[precision] - 1;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
|
|
||||||
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
|
|
||||||
if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* _TS_TSDB_FILE_H_ */
|
|
|
@ -1,38 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_LOG_H_
|
|
||||||
#define _TD_TSDB_LOG_H_
|
|
||||||
|
|
||||||
#include "tlog.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
extern int32_t tsdbDebugFlag;
|
|
||||||
|
|
||||||
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* _TD_TSDB_LOG_H_ */
|
|
|
@ -1,81 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_MEM_TABLE_H_
|
|
||||||
#define _TD_TSDB_MEM_TABLE_H_
|
|
||||||
|
|
||||||
#include "vnode.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int rowsInserted;
|
|
||||||
int rowsUpdated;
|
|
||||||
int rowsDeleteSucceed;
|
|
||||||
int rowsDeleteFailed;
|
|
||||||
int nOperations;
|
|
||||||
TSKEY keyFirst;
|
|
||||||
TSKEY keyLast;
|
|
||||||
} SMergeInfo;
|
|
||||||
|
|
||||||
typedef struct STbData {
|
|
||||||
tb_uid_t uid;
|
|
||||||
TSKEY keyMin;
|
|
||||||
TSKEY keyMax;
|
|
||||||
int64_t nrows;
|
|
||||||
SSkipList *pData;
|
|
||||||
} STbData;
|
|
||||||
|
|
||||||
typedef struct STsdbMemTable {
|
|
||||||
T_REF_DECLARE()
|
|
||||||
SRWLatch latch;
|
|
||||||
TSKEY keyMin;
|
|
||||||
TSKEY keyMax;
|
|
||||||
uint64_t nRow;
|
|
||||||
SMemAllocator *pMA;
|
|
||||||
// Container
|
|
||||||
SSkipList *pSlIdx; // SSkiplist<STbData>
|
|
||||||
SHashObj * pHashIdx;
|
|
||||||
} STsdbMemTable;
|
|
||||||
|
|
||||||
STsdbMemTable *tsdbNewMemTable(STsdb *pTsdb);
|
|
||||||
void tsdbFreeMemTable(STsdb *pTsdb, STsdbMemTable *pMemTable);
|
|
||||||
int tsdbMemTableInsert(STsdb *pTsdb, STsdbMemTable *pMemTable, SSubmitReq *pMsg, SSubmitRsp *pRsp);
|
|
||||||
int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCols *pCols,
|
|
||||||
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
|
||||||
|
|
||||||
static FORCE_INLINE STSRow *tsdbNextIterRow(SSkipListIterator *pIter) {
|
|
||||||
if (pIter == NULL) return NULL;
|
|
||||||
|
|
||||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
|
||||||
if (node == NULL) return NULL;
|
|
||||||
|
|
||||||
return (STSRow *)SL_GET_NODE_DATA(node);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
|
|
||||||
STSRow *row = tsdbNextIterRow(pIter);
|
|
||||||
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
|
|
||||||
|
|
||||||
return TD_ROW_KEY(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_TSDB_MEM_TABLE_H_*/
|
|
|
@ -1,81 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_MEMORY_H_
|
|
||||||
#define _TD_TSDB_MEMORY_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
static void * taosTMalloc(size_t size);
|
|
||||||
static void * taosTCalloc(size_t nmemb, size_t size);
|
|
||||||
static void * taosTRealloc(void *ptr, size_t size);
|
|
||||||
static void * taosTZfree(void *ptr);
|
|
||||||
static size_t taosTSizeof(void *ptr);
|
|
||||||
static void taosTMemset(void *ptr, int c);
|
|
||||||
|
|
||||||
static FORCE_INLINE void *taosTMalloc(size_t size) {
|
|
||||||
if (size <= 0) return NULL;
|
|
||||||
|
|
||||||
void *ret = taosMemoryMalloc(size + sizeof(size_t));
|
|
||||||
if (ret == NULL) return NULL;
|
|
||||||
|
|
||||||
*(size_t *)ret = size;
|
|
||||||
|
|
||||||
return (void *)((char *)ret + sizeof(size_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
|
|
||||||
size_t tsize = nmemb * size;
|
|
||||||
void * ret = taosTMalloc(tsize);
|
|
||||||
if (ret == NULL) return NULL;
|
|
||||||
|
|
||||||
taosTMemset(ret, 0);
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)((char *)ptr - sizeof(size_t))) : 0; }
|
|
||||||
|
|
||||||
static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
|
|
||||||
|
|
||||||
static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
|
|
||||||
if (ptr == NULL) return taosTMalloc(size);
|
|
||||||
|
|
||||||
if (size <= taosTSizeof(ptr)) return ptr;
|
|
||||||
|
|
||||||
void * tptr = (void *)((char *)ptr - sizeof(size_t));
|
|
||||||
size_t tsize = size + sizeof(size_t);
|
|
||||||
void* tptr1 = taosMemoryRealloc(tptr, tsize);
|
|
||||||
if (tptr1 == NULL) return NULL;
|
|
||||||
tptr = tptr1;
|
|
||||||
|
|
||||||
*(size_t *)tptr = size;
|
|
||||||
|
|
||||||
return (void *)((char *)tptr + sizeof(size_t));
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE void* taosTZfree(void* ptr) {
|
|
||||||
if (ptr) {
|
|
||||||
taosMemoryFree((void*)((char*)ptr - sizeof(size_t)));
|
|
||||||
}
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /* _TD_TSDB_MEMORY_H_ */
|
|
|
@ -1,32 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_OPTIONS_H_
|
|
||||||
#define _TD_TSDB_OPTIONS_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
extern const STsdbCfg defautlTsdbOptions;
|
|
||||||
|
|
||||||
int tsdbValidateOptions(const STsdbCfg *);
|
|
||||||
void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc);
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_TSDB_OPTIONS_H_*/
|
|
|
@ -1,256 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_READ_IMPL_H_
|
|
||||||
#define _TD_TSDB_READ_IMPL_H_
|
|
||||||
|
|
||||||
#include "os.h"
|
|
||||||
#include "tcommon.h"
|
|
||||||
#include "tfs.h"
|
|
||||||
#include "vnode.h"
|
|
||||||
#include "tsdbFile.h"
|
|
||||||
#include "tsdbMemory.h"
|
|
||||||
#include "tskiplist.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct SReadH SReadH;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint32_t len;
|
|
||||||
uint32_t offset;
|
|
||||||
uint32_t hasLast : 2;
|
|
||||||
uint32_t numOfBlocks : 30;
|
|
||||||
uint64_t uid;
|
|
||||||
TSKEY maxKey;
|
|
||||||
} SBlockIdx;
|
|
||||||
|
|
||||||
#ifdef TD_REFACTOR_3
|
|
||||||
typedef struct {
|
|
||||||
int64_t last : 1;
|
|
||||||
int64_t offset : 63;
|
|
||||||
int32_t algorithm : 8;
|
|
||||||
int32_t numOfRows : 24;
|
|
||||||
int32_t len;
|
|
||||||
int32_t keyLen; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
|
||||||
int16_t numOfSubBlocks;
|
|
||||||
int16_t numOfCols; // not including timestamp column
|
|
||||||
TSKEY keyFirst;
|
|
||||||
TSKEY keyLast;
|
|
||||||
} SBlock;
|
|
||||||
|
|
||||||
#else
|
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
TSDB_SBLK_VER_0 = 0,
|
|
||||||
TSDB_SBLK_VER_MAX,
|
|
||||||
} ESBlockVer;
|
|
||||||
|
|
||||||
#define SBlockVerLatest TSDB_SBLK_VER_0
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint8_t last : 1;
|
|
||||||
uint8_t blkVer : 7;
|
|
||||||
uint8_t numOfSubBlocks;
|
|
||||||
col_id_t numOfCols; // not including timestamp column
|
|
||||||
uint32_t len; // data block length
|
|
||||||
uint32_t keyLen : 20; // key column length, keyOffset = offset+sizeof(SBlockData)+sizeof(SBlockCol)*numOfCols
|
|
||||||
uint32_t algorithm : 4;
|
|
||||||
uint32_t reserve : 8;
|
|
||||||
col_id_t numOfBSma;
|
|
||||||
uint16_t numOfRows;
|
|
||||||
int64_t offset;
|
|
||||||
uint64_t aggrStat : 1;
|
|
||||||
uint64_t aggrOffset : 63;
|
|
||||||
TSKEY keyFirst;
|
|
||||||
TSKEY keyLast;
|
|
||||||
} SBlockV0;
|
|
||||||
|
|
||||||
#define SBlock SBlockV0 // latest SBlock definition
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t delimiter; // For recovery usage
|
|
||||||
int32_t tid;
|
|
||||||
uint64_t uid;
|
|
||||||
SBlock blocks[];
|
|
||||||
} SBlockInfo;
|
|
||||||
|
|
||||||
#ifdef TD_REFACTOR_3
|
|
||||||
typedef struct {
|
|
||||||
int16_t colId;
|
|
||||||
uint16_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
|
|
||||||
uint16_t reserve : 15;
|
|
||||||
int32_t len;
|
|
||||||
uint32_t type : 8;
|
|
||||||
uint32_t offset : 24;
|
|
||||||
int64_t sum;
|
|
||||||
int64_t max;
|
|
||||||
int64_t min;
|
|
||||||
int16_t maxIndex;
|
|
||||||
int16_t minIndex;
|
|
||||||
int16_t numOfNull;
|
|
||||||
uint8_t offsetH;
|
|
||||||
char padding[1];
|
|
||||||
} SBlockCol;
|
|
||||||
#else
|
|
||||||
typedef struct {
|
|
||||||
int16_t colId;
|
|
||||||
uint16_t type : 6;
|
|
||||||
uint16_t blen : 10; // bitmap length(TODO: full UT for the bitmap compress of various data input)
|
|
||||||
uint32_t bitmap : 1; // 0: has bitmap if has NULL/NORM rows, 1: no bitmap if all rows are NORM
|
|
||||||
uint32_t len : 31; // data length + bitmap length
|
|
||||||
uint32_t offset;
|
|
||||||
} SBlockColV0;
|
|
||||||
|
|
||||||
#define SBlockCol SBlockColV0 // latest SBlockCol definition
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int16_t colId;
|
|
||||||
int16_t maxIndex;
|
|
||||||
int16_t minIndex;
|
|
||||||
int16_t numOfNull;
|
|
||||||
int64_t sum;
|
|
||||||
int64_t max;
|
|
||||||
int64_t min;
|
|
||||||
} SAggrBlkColV0;
|
|
||||||
|
|
||||||
#define SAggrBlkCol SAggrBlkColV0 // latest SAggrBlkCol definition
|
|
||||||
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// Code here just for back-ward compatibility
|
|
||||||
static FORCE_INLINE void tsdbSetBlockColOffset(SBlockCol *pBlockCol, uint32_t offset) {
|
|
||||||
#ifdef TD_REFACTOR_3
|
|
||||||
pBlockCol->offset = offset & ((((uint32_t)1) << 24) - 1);
|
|
||||||
pBlockCol->offsetH = (uint8_t)(offset >> 24);
|
|
||||||
#else
|
|
||||||
pBlockCol->offset = offset;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE uint32_t tsdbGetBlockColOffset(SBlockCol *pBlockCol) {
|
|
||||||
#ifdef TD_REFACTOR_3
|
|
||||||
uint32_t offset1 = pBlockCol->offset;
|
|
||||||
uint32_t offset2 = pBlockCol->offsetH;
|
|
||||||
return (offset1 | (offset2 << 24));
|
|
||||||
#else
|
|
||||||
return pBlockCol->offset;
|
|
||||||
#endif
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t delimiter; // For recovery usage
|
|
||||||
int32_t numOfCols; // For recovery usage
|
|
||||||
uint64_t uid; // For recovery usage
|
|
||||||
SBlockCol cols[];
|
|
||||||
} SBlockData;
|
|
||||||
|
|
||||||
typedef void SAggrBlkData; // SBlockCol cols[];
|
|
||||||
|
|
||||||
struct SReadH {
|
|
||||||
STsdb *pRepo;
|
|
||||||
SDFileSet rSet; // FSET to read
|
|
||||||
SArray *aBlkIdx; // SBlockIdx array
|
|
||||||
STable *pTable; // table to read
|
|
||||||
SBlockIdx *pBlkIdx; // current reading table SBlockIdx
|
|
||||||
int cidx;
|
|
||||||
SBlockInfo *pBlkInfo;
|
|
||||||
SBlockData *pBlkData; // Block info
|
|
||||||
SAggrBlkData *pAggrBlkData; // Aggregate Block info
|
|
||||||
SDataCols *pDCols[2];
|
|
||||||
void *pBuf; // buffer
|
|
||||||
void *pCBuf; // compression buffer
|
|
||||||
void *pExBuf; // extra buffer
|
|
||||||
};
|
|
||||||
|
|
||||||
#define TSDB_READ_REPO(rh) ((rh)->pRepo)
|
|
||||||
#define TSDB_READ_REPO_ID(rh) REPO_ID(TSDB_READ_REPO(rh))
|
|
||||||
#define TSDB_READ_FSET(rh) (&((rh)->rSet))
|
|
||||||
#define TSDB_READ_TABLE(rh) ((rh)->pTable)
|
|
||||||
#define TSDB_READ_HEAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_HEAD)
|
|
||||||
#define TSDB_READ_DATA_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_DATA)
|
|
||||||
#define TSDB_READ_LAST_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_LAST)
|
|
||||||
#define TSDB_READ_SMAD_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAD)
|
|
||||||
#define TSDB_READ_SMAL_FILE(rh) TSDB_DFILE_IN_SET(TSDB_READ_FSET(rh), TSDB_FILE_SMAL)
|
|
||||||
#define TSDB_READ_BUF(rh) ((rh)->pBuf)
|
|
||||||
#define TSDB_READ_COMP_BUF(rh) ((rh)->pCBuf)
|
|
||||||
#define TSDB_READ_EXBUF(rh) ((rh)->pExBuf)
|
|
||||||
|
|
||||||
#define TSDB_BLOCK_STATIS_SIZE(ncols, blkVer) \
|
|
||||||
(sizeof(SBlockData) + sizeof(SBlockColV##blkVer) * (ncols) + sizeof(TSCKSUM))
|
|
||||||
|
|
||||||
static FORCE_INLINE size_t tsdbBlockStatisSize(int nCols, uint32_t blkVer) {
|
|
||||||
switch (blkVer) {
|
|
||||||
case TSDB_SBLK_VER_0:
|
|
||||||
default:
|
|
||||||
return TSDB_BLOCK_STATIS_SIZE(nCols, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#define TSDB_BLOCK_AGGR_SIZE(ncols, blkVer) (sizeof(SAggrBlkColV##blkVer) * (ncols) + sizeof(TSCKSUM))
|
|
||||||
|
|
||||||
static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) {
|
|
||||||
switch (blkVer) {
|
|
||||||
case TSDB_SBLK_VER_0:
|
|
||||||
default:
|
|
||||||
return TSDB_BLOCK_AGGR_SIZE(nCols, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo);
|
|
||||||
void tsdbDestroyReadH(SReadH *pReadh);
|
|
||||||
int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet);
|
|
||||||
void tsdbCloseAndUnsetFSet(SReadH *pReadh);
|
|
||||||
int tsdbLoadBlockIdx(SReadH *pReadh);
|
|
||||||
int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
|
|
||||||
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
|
|
||||||
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
|
|
||||||
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds,
|
|
||||||
int numOfColsIds);
|
|
||||||
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
|
|
||||||
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
|
|
||||||
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
|
|
||||||
void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
|
||||||
void *pBuf = *ppBuf;
|
|
||||||
size_t tsize = taosTSizeof(pBuf);
|
|
||||||
|
|
||||||
if (tsize < size) {
|
|
||||||
if (tsize == 0) tsize = 1024;
|
|
||||||
|
|
||||||
while (tsize < size) {
|
|
||||||
tsize *= 2;
|
|
||||||
}
|
|
||||||
|
|
||||||
*ppBuf = taosTRealloc(pBuf, tsize);
|
|
||||||
if (*ppBuf == NULL) {
|
|
||||||
terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_TSDB_READ_IMPL_H_*/
|
|
|
@ -1,89 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_TSDB_SMA_H_
|
|
||||||
#define _TD_TSDB_SMA_H_
|
|
||||||
|
|
||||||
#define TSDB_SMA_TEST // remove after test finished
|
|
||||||
|
|
||||||
typedef struct SSmaStat SSmaStat;
|
|
||||||
typedef struct SSmaEnv SSmaEnv;
|
|
||||||
typedef struct SSmaEnvs SSmaEnvs;
|
|
||||||
|
|
||||||
struct SSmaEnv {
|
|
||||||
TdThreadRwlock lock;
|
|
||||||
SDiskID did;
|
|
||||||
TDBEnv dbEnv; // TODO: If it's better to put it in smaIndex level?
|
|
||||||
char *path; // relative path
|
|
||||||
SSmaStat *pStat;
|
|
||||||
};
|
|
||||||
|
|
||||||
#define SMA_ENV_LOCK(env) ((env)->lock)
|
|
||||||
#define SMA_ENV_DID(env) ((env)->did)
|
|
||||||
#define SMA_ENV_ENV(env) ((env)->dbEnv)
|
|
||||||
#define SMA_ENV_PATH(env) ((env)->path)
|
|
||||||
#define SMA_ENV_STAT(env) ((env)->pStat)
|
|
||||||
#define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
|
|
||||||
|
|
||||||
struct SSmaEnvs {
|
|
||||||
int16_t nTSma;
|
|
||||||
int16_t nRSma;
|
|
||||||
SSmaEnv *pTSmaEnv;
|
|
||||||
SSmaEnv *pRSmaEnv;
|
|
||||||
};
|
|
||||||
|
|
||||||
void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
|
|
||||||
void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
|
|
||||||
#if 0
|
|
||||||
int32_t tsdbGetTSmaStatus(STsdb *pTsdb, STSma *param, void *result);
|
|
||||||
int32_t tsdbRemoveTSmaData(STsdb *pTsdb, STSma *param, STimeWindow *pWin);
|
|
||||||
#endif
|
|
||||||
|
|
||||||
// internal func
|
|
||||||
static FORCE_INLINE int32_t tsdbEncodeTSmaKey(int64_t groupId, TSKEY tsKey, void **pData) {
|
|
||||||
int32_t len = 0;
|
|
||||||
len += taosEncodeFixedI64(pData, tsKey);
|
|
||||||
len += taosEncodeFixedI64(pData, groupId);
|
|
||||||
return len;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbRLockSma(SSmaEnv *pEnv) {
|
|
||||||
int code = taosThreadRwlockRdlock(&(pEnv->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbWLockSma(SSmaEnv *pEnv) {
|
|
||||||
int code = taosThreadRwlockWrlock(&(pEnv->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbUnLockSma(SSmaEnv *pEnv) {
|
|
||||||
int code = taosThreadRwlockUnlock(&(pEnv->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
#endif /* _TD_TSDB_SMA_H_ */
|
|
|
@ -17,7 +17,6 @@
|
||||||
#define _TD_VNODE_DEF_H_
|
#define _TD_VNODE_DEF_H_
|
||||||
|
|
||||||
#include "tmallocator.h"
|
#include "tmallocator.h"
|
||||||
// #include "sync.h"
|
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "tfs.h"
|
#include "tfs.h"
|
||||||
|
@ -27,6 +26,11 @@
|
||||||
#include "vnode.h"
|
#include "vnode.h"
|
||||||
#include "vnodeQuery.h"
|
#include "vnodeQuery.h"
|
||||||
#include "wal.h"
|
#include "wal.h"
|
||||||
|
#include "tskiplist.h"
|
||||||
|
#include "tchecksum.h"
|
||||||
|
#include "tglobal.h"
|
||||||
|
#include "ttime.h"
|
||||||
|
#include "tcompression.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
|
|
@ -83,9 +83,9 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t versi
|
||||||
memcpy(data, msg, msgLen);
|
memcpy(data, msg, msgLen);
|
||||||
|
|
||||||
if (msgType == TDMT_VND_SUBMIT) {
|
if (msgType == TDMT_VND_SUBMIT) {
|
||||||
if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) {
|
// if (tsdbUpdateSmaWindow(pTq->pVnode->pTsdb, msg) != 0) {
|
||||||
return -1;
|
// return -1;
|
||||||
}
|
// }
|
||||||
}
|
}
|
||||||
|
|
||||||
SRpcMsg req = {
|
SRpcMsg req = {
|
||||||
|
|
|
@ -19,8 +19,6 @@
|
||||||
#include "taoserror.h"
|
#include "taoserror.h"
|
||||||
#include "tcoding.h"
|
#include "tcoding.h"
|
||||||
#include "thash.h"
|
#include "thash.h"
|
||||||
#include "tsdbDBDef.h"
|
|
||||||
#include "tsdbLog.h"
|
|
||||||
|
|
||||||
#define IMPL_WITH_LOCK 1
|
#define IMPL_WITH_LOCK 1
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <regex.h>
|
#include <regex.h>
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
|
|
||||||
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
|
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static const char *TSDB_FNAME_SUFFIX[] = {
|
static const char *TSDB_FNAME_SUFFIX[] = {
|
||||||
"head", // TSDB_FILE_HEAD
|
"head", // TSDB_FILE_HEAD
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
|
static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF,
|
||||||
SMeta *pMeta, STfs *pTfs);
|
SMeta *pMeta, STfs *pTfs);
|
||||||
|
@ -87,8 +87,8 @@ static STsdb *tsdbNew(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg,
|
||||||
|
|
||||||
static void tsdbFree(STsdb *pTsdb) {
|
static void tsdbFree(STsdb *pTsdb) {
|
||||||
if (pTsdb) {
|
if (pTsdb) {
|
||||||
tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb));
|
// tsdbFreeSmaEnv(REPO_TSMA_ENV(pTsdb));
|
||||||
tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb));
|
// tsdbFreeSmaEnv(REPO_RSMA_ENV(pTsdb));
|
||||||
tsdbFreeFS(pTsdb->fs);
|
tsdbFreeFS(pTsdb->fs);
|
||||||
taosMemoryFreeClear(pTsdb->path);
|
taosMemoryFreeClear(pTsdb->path);
|
||||||
taosMemoryFree(pTsdb);
|
taosMemoryFree(pTsdb);
|
||||||
|
@ -98,7 +98,7 @@ static void tsdbFree(STsdb *pTsdb) {
|
||||||
static int tsdbOpenImpl(STsdb *pTsdb) {
|
static int tsdbOpenImpl(STsdb *pTsdb) {
|
||||||
tsdbOpenFS(pTsdb);
|
tsdbOpenFS(pTsdb);
|
||||||
|
|
||||||
tsdbInitSma(pTsdb);
|
// tsdbInitSma(pTsdb);
|
||||||
// TODO
|
// TODO
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||||
static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows);
|
static int tsdbMemTableInsertTbData(STsdb *pRepo, SSubmitBlk *pBlock, int32_t *pAffectedRows);
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
const STsdbCfg defautlTsdbOptions = {.precision = 0,
|
const STsdbCfg defautlTsdbOptions = {.precision = 0,
|
||||||
.lruCacheSize = 0,
|
.lruCacheSize = 0,
|
||||||
|
|
|
@ -13,25 +13,18 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
#include "tdatablock.h"
|
#include "tdatablock.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "talgo.h"
|
#include "talgo.h"
|
||||||
#include "tcompare.h"
|
#include "tcompare.h"
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "texception.h"
|
#include "texception.h"
|
||||||
#include "vnode.h"
|
|
||||||
#include "tsdbFS.h"
|
|
||||||
#include "tsdbLog.h"
|
|
||||||
#include "tsdbReadImpl.h"
|
|
||||||
#include "tskiplist.h"
|
|
||||||
#include "ttime.h"
|
|
||||||
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
#include "tlosertree.h"
|
#include "tlosertree.h"
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
#include "tmsg.h"
|
#include "tmsg.h"
|
||||||
#include "tsdbCommit.h"
|
|
||||||
|
|
||||||
#define EXTRA_BYTES 2
|
#define EXTRA_BYTES 2
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#define TSDB_KEY_COL_OFFSET 0
|
#define TSDB_KEY_COL_OFFSET 0
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
static const char *TSDB_SMA_DNAME[] = {
|
static const char *TSDB_SMA_DNAME[] = {
|
||||||
"", // TSDB_SMA_TYPE_BLOCK
|
"", // TSDB_SMA_TYPE_BLOCK
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdbDef.h"
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief insert TS data
|
* @brief insert TS data
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
// TODO
|
// TODO
|
||||||
|
|
||||||
blockDebugShowData(data);
|
// blockDebugShowData(data);
|
||||||
tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
|
// tsdbInsertTSmaData(((SVnode *)pVnode)->pTsdb, smaId, (const char *)data);
|
||||||
}
|
}
|
||||||
|
|
||||||
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
||||||
|
@ -232,16 +232,16 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
tdDestroyTSma(&vCreateSmaReq.tSma);
|
tdDestroyTSma(&vCreateSmaReq.tSma);
|
||||||
// TODO: return directly or go on follow steps?
|
// TODO: return directly or go on follow steps?
|
||||||
#endif
|
#endif
|
||||||
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
// if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
// TODO
|
// // TODO
|
||||||
}
|
// }
|
||||||
} break;
|
// } break;
|
||||||
case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
|
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
|
||||||
} break;
|
// } break;
|
||||||
case TDMT_VND_DROP_SMA: { // timeRangeSMA
|
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
|
||||||
if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
|
||||||
// TODO
|
// // TODO
|
||||||
}
|
// }
|
||||||
#if 0
|
#if 0
|
||||||
tsdbTSmaSub(pVnode->pTsdb, 1);
|
tsdbTSmaSub(pVnode->pTsdb, 1);
|
||||||
SVDropTSmaReq vDropSmaReq = {0};
|
SVDropTSmaReq vDropSmaReq = {0};
|
||||||
|
|
|
@ -25,15 +25,15 @@ AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||||
# COMMAND tqTest
|
# COMMAND tqTest
|
||||||
# )
|
# )
|
||||||
|
|
||||||
ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp)
|
# ADD_EXECUTABLE(tsdbSmaTest tsdbSmaTest.cpp)
|
||||||
TARGET_LINK_LIBRARIES(
|
# TARGET_LINK_LIBRARIES(
|
||||||
tsdbSmaTest
|
# tsdbSmaTest
|
||||||
PUBLIC os util common vnode gtest_main
|
# PUBLIC os util common vnode gtest_main
|
||||||
)
|
# )
|
||||||
|
|
||||||
TARGET_INCLUDE_DIRECTORIES(
|
# TARGET_INCLUDE_DIRECTORIES(
|
||||||
tsdbSmaTest
|
# tsdbSmaTest
|
||||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
|
# PUBLIC "${CMAKE_SOURCE_DIR}/include/common"
|
||||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc"
|
# PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../src/inc"
|
||||||
PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
# PUBLIC "${CMAKE_CURRENT_SOURCE_DIR}/../inc"
|
||||||
)
|
# )
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include <gtest/gtest.h>
|
#include <gtest/gtest.h>
|
||||||
#include <tsdbDef.h>
|
#include <vnodeInt.h>
|
||||||
|
|
||||||
#include <taoserror.h>
|
#include <taoserror.h>
|
||||||
#include <tglobal.h>
|
#include <tglobal.h>
|
||||||
|
|
Loading…
Reference in New Issue