refact vnode

This commit is contained in:
Hongze Cheng 2022-04-14 02:53:10 +00:00
parent bb6806b59e
commit f80768efb3
4 changed files with 222 additions and 247 deletions

View File

@ -61,33 +61,14 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
// meta // meta
typedef struct SMeta SMeta; // todo: remove typedef struct SMeta SMeta; // todo: remove
typedef struct SMTbCursor SMTbCursor; // todo: remove typedef struct SMTbCursor SMTbCursor;
typedef struct SMCtbCursor SMCtbCursor; // todo: remove
typedef struct SMSmaCursor SMSmaCursor; // todo: remove
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
typedef SVCreateTbReq STbCfg; typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg; typedef SVCreateTSmaReq SSmaCfg;
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
int metaGetTbNum(SMeta *pMeta);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta); SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur); void metaCloseTbCursor(SMTbCursor *pTbCur);
char *metaTbCursorNext(SMTbCursor *pTbCur); char *metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor *pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur);
// tsdb // tsdb
typedef struct STsdb STsdb; typedef struct STsdb STsdb;
@ -98,18 +79,7 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3 #define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
int32_t tsdbInitSma(STsdb *pTsdb);
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId); uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
@ -127,18 +97,8 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// tq // tq
enum {
TQ_STREAM_TOKEN__DATA = 1,
TQ_STREAM_TOKEN__WATERMARK,
TQ_STREAM_TOKEN__CHECKPOINT,
};
typedef struct STqReadHandle STqReadHandle; typedef struct STqReadHandle STqReadHandle;
@ -202,21 +162,6 @@ struct SVnodeCfg {
int8_t hashMethod; int8_t hashMethod;
}; };
struct STqReadHandle {
int64_t ver;
int64_t tbUid;
SHashObj *tbIdHash;
const SSubmitReq *pMsg;
SSubmitBlk *pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta *pVnodeMeta;
SArray *pColIdList; // SArray<int32_t>
int32_t sver;
SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema;
};
struct SDataStatis { struct SDataStatis {
int16_t colId; int16_t colId;
int16_t maxIndex; int16_t maxIndex;
@ -241,22 +186,6 @@ typedef struct {
uint64_t uid; uint64_t uid;
} STableKeyInfo; } STableKeyInfo;
typedef struct STable {
uint64_t tid;
uint64_t uid;
STSchema *pSchema;
} STable;
typedef struct {
int8_t type;
int8_t reserved[7];
union {
void *data;
int64_t wmTs;
int64_t checkpointId;
};
} STqStreamToken;
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -23,6 +23,22 @@ extern "C" {
typedef struct SMetaCache SMetaCache; typedef struct SMetaCache SMetaCache;
typedef struct SMetaIdx SMetaIdx; typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB; typedef struct SMetaDB SMetaDB;
typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMSmaCursor SMSmaCursor;
// metaDebug ==================
// clang-format off
#define metaFatal(...) do { if (metaDebugFlag & DEBUG_FATAL) { taosPrintLog("META FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define metaError(...) do { if (metaDebugFlag & DEBUG_ERROR) { taosPrintLog("META ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define metaWarn(...) do { if (metaDebugFlag & DEBUG_WARN) { taosPrintLog("META WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define metaInfo(...) do { if (metaDebugFlag & DEBUG_INFO) { taosPrintLog("META ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define metaDebug(...) do { if (metaDebugFlag & DEBUG_DEBUG) { taosPrintLog("META ", DEBUG_DEBUG, metaDebugFlag, __VA_ARGS__); }} while(0)
#define metaTrace(...) do { if (metaDebugFlag & DEBUG_TRACE) { taosPrintLog("META ", DEBUG_TRACE, metaDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF);
void metaClose(SMeta* pMeta); void metaClose(SMeta* pMeta);
@ -34,6 +50,18 @@ int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid); STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid);
STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid); STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup);
int metaGetTbNum(SMeta* pMeta);
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor* pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
// SMetaDB // SMetaDB
int metaOpenDB(SMeta* pMeta); int metaOpenDB(SMeta* pMeta);

View File

@ -20,48 +20,21 @@
extern "C" { extern "C" {
#endif #endif
// tqInt.h // tqDebug ===================
#define tqFatal(...) \ // clang-format off
{ \ #define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
if (tqDebugFlag & DEBUG_FATAL) { \ #define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ #define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
} \ #define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
} #define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define tqError(...) \ enum {
{ \ TQ_STREAM_TOKEN__DATA = 1,
if (tqDebugFlag & DEBUG_ERROR) { \ TQ_STREAM_TOKEN__WATERMARK,
taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ TQ_STREAM_TOKEN__CHECKPOINT,
} \ };
}
#define tqWarn(...) \
{ \
if (tqDebugFlag & DEBUG_WARN) { \
taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define tqInfo(...) \
{ \
if (tqDebugFlag & DEBUG_INFO) { \
taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define tqDebug(...) \
{ \
if (tqDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \
} \
}
#define tqTrace(...) \
{ \
if (tqDebugFlag & DEBUG_TRACE) { \
taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \
} \
}
#define TQ_BUFFER_SIZE 4 #define TQ_BUFFER_SIZE 4
@ -105,6 +78,31 @@ typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
typedef struct STqOffsetCfg STqOffsetCfg; typedef struct STqOffsetCfg STqOffsetCfg;
typedef struct STqOffsetStore STqOffsetStore; typedef struct STqOffsetStore STqOffsetStore;
struct STqReadHandle {
int64_t ver;
int64_t tbUid;
SHashObj* tbIdHash;
const SSubmitReq* pMsg;
SSubmitBlk* pBlock;
SSubmitMsgIter msgIter;
SSubmitBlkIter blkIter;
SMeta* pVnodeMeta;
SArray* pColIdList; // SArray<int32_t>
int32_t sver;
SSchemaWrapper* pSchemaWrapper;
STSchema* pSchema;
};
typedef struct {
int8_t type;
int8_t reserved[7];
union {
void* data;
int64_t wmTs;
int64_t checkpointId;
};
} STqStreamToken;
typedef struct { typedef struct {
int16_t ver; int16_t ver;
int16_t action; int16_t action;
@ -248,7 +246,6 @@ typedef struct {
static STqPushMgmt tqPushMgmt; static STqPushMgmt tqPushMgmt;
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);

View File

@ -20,10 +20,45 @@
extern "C" { extern "C" {
#endif #endif
// tsdbDebug ================
// clang-format off
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct SSmaStat SSmaStat; typedef struct SSmaStat SSmaStat;
typedef struct SSmaEnv SSmaEnv; typedef struct SSmaEnv SSmaEnv;
typedef struct SSmaEnvs SSmaEnvs; typedef struct SSmaEnvs SSmaEnvs;
typedef struct STable {
uint64_t tid;
uint64_t uid;
STSchema *pSchema;
} STable;
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
int32_t tsdbInitSma(STsdb *pTsdb);
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
typedef enum { typedef enum {
TSDB_FILE_HEAD = 0, // .head TSDB_FILE_HEAD = 0, // .head
TSDB_FILE_DATA, // .data TSDB_FILE_DATA, // .data
@ -93,7 +128,7 @@ typedef struct STsdbMemTable {
SMemAllocator *pMA; SMemAllocator *pMA;
// Container // Container
SSkipList *pSlIdx; // SSkiplist<STbData> SSkipList *pSlIdx; // SSkiplist<STbData>
SHashObj * pHashIdx; SHashObj *pHashIdx;
} STsdbMemTable; } STsdbMemTable;
typedef struct { typedef struct {
@ -105,16 +140,16 @@ typedef struct {
// ================== // ==================
typedef struct { typedef struct {
STsdbFSMeta meta; // FS meta STsdbFSMeta meta; // FS meta
SArray * df; // data file array SArray *df; // data file array
SArray * sf; // sma data file array v2f1900.index_name_1 SArray *sf; // sma data file array v2f1900.index_name_1
} SFSStatus; } SFSStatus;
typedef struct { typedef struct {
TdThreadRwlock lock; TdThreadRwlock lock;
SFSStatus *cstatus; // current status SFSStatus *cstatus; // current status
SHashObj * metaCache; // meta cache SHashObj *metaCache; // meta cache
SHashObj * metaCacheComp; // meta cache for compact SHashObj *metaCacheComp; // meta cache for compact
bool intxn; bool intxn;
SFSStatus *nstatus; // new status SFSStatus *nstatus; // new status
} STsdbFS; } STsdbFS;
@ -123,15 +158,15 @@ struct STsdb {
int32_t vgId; int32_t vgId;
bool repoLocked; bool repoLocked;
TdThreadMutex mutex; TdThreadMutex mutex;
char * path; char *path;
STsdbCfg config; STsdbCfg config;
STsdbMemTable * mem; STsdbMemTable *mem;
STsdbMemTable * imem; STsdbMemTable *imem;
SRtn rtn; SRtn rtn;
SMemAllocatorFactory *pmaf; SMemAllocatorFactory *pmaf;
STsdbFS * fs; STsdbFS *fs;
SMeta * pMeta; SMeta *pMeta;
STfs * pTfs; STfs *pTfs;
SSmaEnvs smaEnvs; SSmaEnvs smaEnvs;
}; };
@ -153,16 +188,6 @@ static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock,
return pTable->pSchema; return pTable->pSchema;
} }
// tsdbLog
extern int32_t tsdbDebugFlag;
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0)
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// tsdbMemTable.h // tsdbMemTable.h
typedef struct { typedef struct {
int rowsInserted; int rowsInserted;
@ -174,10 +199,10 @@ typedef struct {
TSKEY keyLast; TSKEY keyLast;
} SMergeInfo; } SMergeInfo;
static void * taosTMalloc(size_t size); static void *taosTMalloc(size_t size);
static void * taosTCalloc(size_t nmemb, size_t size); static void *taosTCalloc(size_t nmemb, size_t size);
static void * taosTRealloc(void *ptr, size_t size); static void *taosTRealloc(void *ptr, size_t size);
static void * taosTZfree(void *ptr); static void *taosTZfree(void *ptr);
static size_t taosTSizeof(void *ptr); static size_t taosTSizeof(void *ptr);
static void taosTMemset(void *ptr, int c); static void taosTMemset(void *ptr, int c);
@ -406,8 +431,8 @@ int tsdbLoadBlockIdx(SReadH *pReadh);
int tsdbSetReadTable(SReadH *pReadh, STable *pTable); int tsdbSetReadTable(SReadH *pReadh, STable *pTable);
int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget);
int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo);
int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds,
int numOfColsIds, bool mergeBitmap); bool mergeBitmap);
int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock);
int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx);
void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx);
@ -448,7 +473,7 @@ static FORCE_INLINE void *taosTMalloc(size_t size) {
static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) {
size_t tsize = nmemb * size; size_t tsize = nmemb * size;
void * ret = taosTMalloc(tsize); void *ret = taosTMalloc(tsize);
if (ret == NULL) return NULL; if (ret == NULL) return NULL;
taosTMemset(ret, 0); taosTMemset(ret, 0);
@ -459,14 +484,14 @@ static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)(
static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); }
static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) { static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) {
if (ptr == NULL) return taosTMalloc(size); if (ptr == NULL) return taosTMalloc(size);
if (size <= taosTSizeof(ptr)) return ptr; if (size <= taosTSizeof(ptr)) return ptr;
void * tptr = (void *)((char *)ptr - sizeof(size_t)); void *tptr = (void *)((char *)ptr - sizeof(size_t));
size_t tsize = size + sizeof(size_t); size_t tsize = size + sizeof(size_t);
void* tptr1 = taosMemoryRealloc(tptr, tsize); void *tptr1 = taosMemoryRealloc(tptr, tsize);
if (tptr1 == NULL) return NULL; if (tptr1 == NULL) return NULL;
tptr = tptr1; tptr = tptr1;
@ -475,9 +500,9 @@ static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) {
return (void *)((char *)tptr + sizeof(size_t)); return (void *)((char *)tptr + sizeof(size_t));
} }
static FORCE_INLINE void* taosTZfree(void* ptr) { static FORCE_INLINE void *taosTZfree(void *ptr) {
if (ptr) { if (ptr) {
taosMemoryFree((void*)((char*)ptr - sizeof(size_t))); taosMemoryFree((void *)((char *)ptr - sizeof(size_t)));
} }
return NULL; return NULL;
} }
@ -576,19 +601,18 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v
} }
} }
void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype);
void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile);
int tsdbEncodeSDFile(void **buf, SDFile *pDFile);
void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile *pDFile);
int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo);
int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version);
void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype); static FORCE_INLINE void tsdbSetDFileInfo(SDFile *pDFile, SDFInfo *pInfo) { pDFile->info = *pInfo; }
void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile);
int tsdbEncodeSDFile(void** buf, SDFile* pDFile);
void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile);
int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType);
int tsdbUpdateDFileHeader(SDFile* pDFile);
int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo);
int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version);
static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; } static FORCE_INLINE int tsdbOpenDFile(SDFile *pDFile, int flags) {
static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
ASSERT(!TSDB_FILE_OPENED(pDFile)); ASSERT(!TSDB_FILE_OPENED(pDFile));
pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags); pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags);
@ -600,14 +624,14 @@ static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) {
return 0; return 0;
} }
static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) { static FORCE_INLINE void tsdbCloseDFile(SDFile *pDFile) {
if (TSDB_FILE_OPENED(pDFile)) { if (TSDB_FILE_OPENED(pDFile)) {
taosCloseFile(&pDFile->pFile); taosCloseFile(&pDFile->pFile);
TSDB_FILE_SET_CLOSED(pDFile); TSDB_FILE_SET_CLOSED(pDFile);
} }
} }
static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) { static FORCE_INLINE int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) {
// ASSERT(TSDB_FILE_OPENED(pDFile)); // ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence); int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence);
@ -619,7 +643,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh
return loffset; return loffset;
} }
static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) { static FORCE_INLINE int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile)); ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte); int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte);
@ -631,11 +655,11 @@ static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nb
return nwrite; return nwrite;
} }
static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) { static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) {
pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM)); pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM));
} }
static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) { static FORCE_INLINE int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) {
ASSERT(TSDB_FILE_OPENED(pDFile)); ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t toffset; int64_t toffset;
@ -659,9 +683,9 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte
return (int)nbyte; return (int)nbyte;
} }
static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); } static FORCE_INLINE int tsdbRemoveDFile(SDFile *pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); }
static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) { static FORCE_INLINE int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) {
ASSERT(TSDB_FILE_OPENED(pDFile)); ASSERT(TSDB_FILE_OPENED(pDFile));
int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte); int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte);
@ -673,7 +697,7 @@ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nby
return nread; return nread;
} }
static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { static FORCE_INLINE int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest) {
if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) { if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) {
terrno = TAOS_SYSTEM_ERROR(errno); terrno = TAOS_SYSTEM_ERROR(errno);
return -1; return -1;
@ -719,24 +743,24 @@ typedef struct {
} \ } \
} while (0); } while (0);
void tsdbInitDFileSet(STsdb *pRepo, SDFileSet* pSet, SDiskID did, int fid, uint32_t ver); void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver);
void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet);
int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet);
void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet); void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet);
int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet); int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet);
void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet);
int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to);
int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet* pSet, bool updateHeader); int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader);
int tsdbUpdateDFileSetHeader(SDFileSet* pSet); int tsdbUpdateDFileSetHeader(SDFileSet *pSet);
int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet); int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet);
static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype)); tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype));
} }
} }
static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) { static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet *pSet, int flags) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) { if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) {
tsdbCloseDFileSet(pSet); tsdbCloseDFileSet(pSet);
@ -746,13 +770,13 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) {
return 0; return 0;
} }
static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) { static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
(void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype)); (void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype));
} }
} }
static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) { static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) { if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) {
tsdbRemoveDFileSet(pDest); tsdbRemoveDFileSet(pDest);
@ -763,12 +787,12 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) {
return 0; return 0;
} }
static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) { static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) {
*minKey = fid * days * tsTickPerDay[precision]; *minKey = fid * days * tsTickPerDay[precision];
*maxKey = *minKey + days * tsTickPerDay[precision] - 1; *maxKey = *minKey + days * tsTickPerDay[precision] - 1;
} }
static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) {
for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) {
if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) {
return false; return false;
@ -817,7 +841,7 @@ typedef struct {
typedef struct { typedef struct {
int direction; int direction;
uint64_t version; // current FS version uint64_t version; // current FS version
STsdbFS * pfs; STsdbFS *pfs;
int index; // used to position next fset when version the same int index; // used to position next fset when version the same
int fid; // used to seek when version is changed int fid; // used to seek when version is changed
SDFileSet *pSet; SDFileSet *pSet;
@ -827,7 +851,7 @@ typedef struct {
#define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC
STsdbFS *tsdbNewFS(const STsdbCfg *pCfg); STsdbFS *tsdbNewFS(const STsdbCfg *pCfg);
void * tsdbFreeFS(STsdbFS *pfs); void *tsdbFreeFS(STsdbFS *pfs);
int tsdbOpenFS(STsdb *pRepo); int tsdbOpenFS(STsdb *pRepo);
void tsdbCloseFS(STsdb *pRepo); void tsdbCloseFS(STsdb *pRepo);
void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd); void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd);
@ -872,7 +896,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
// tsdbSma // tsdbSma
// #define TSDB_SMA_TEST // remove after test finished // #define TSDB_SMA_TEST // remove after test finished
// struct SSmaEnv { // struct SSmaEnv {
// TdThreadRwlock lock; // TdThreadRwlock lock;
// SDiskID did; // SDiskID did;
@ -888,7 +911,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
// #define SMA_ENV_STAT(env) ((env)->pStat) // #define SMA_ENV_STAT(env) ((env)->pStat)
// #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) // #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems)
// void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); // void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv);
// void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); // void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv);
// #if 0 // #if 0
@ -935,5 +957,4 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
} }
#endif #endif
#endif /*_TD_VNODE_TSDB_H_*/ #endif /*_TD_VNODE_TSDB_H_*/