refact
This commit is contained in:
parent
409a2d122f
commit
2764ea1fc6
|
@ -0,0 +1,51 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_BUFFER_H_
|
||||||
|
#define _TD_TSDB_BUFFER_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t blockId;
|
||||||
|
int offset;
|
||||||
|
int remain;
|
||||||
|
char data[];
|
||||||
|
} STsdbBufBlock;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pthread_cond_t poolNotEmpty;
|
||||||
|
int bufBlockSize;
|
||||||
|
int tBufBlocks;
|
||||||
|
int nBufBlocks;
|
||||||
|
int64_t index;
|
||||||
|
SList* bufBlockList;
|
||||||
|
} STsdbBufPool;
|
||||||
|
|
||||||
|
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
||||||
|
|
||||||
|
STsdbBufPool* tsdbNewBufPool();
|
||||||
|
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
||||||
|
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
||||||
|
void tsdbCloseBufPool(STsdbRepo* pRepo);
|
||||||
|
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_BUFFER_H_ */
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_COMMIT_H_
|
||||||
|
#define _TD_TSDB_COMMIT_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
void *tsdbCommitData(STsdbRepo *pRepo);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_COMMIT_H_ */
|
|
@ -0,0 +1,29 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
|
||||||
|
#define _TD_TSDB_COMMIT_QUEUE_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
int tsdbScheduleCommit(STsdbRepo *pRepo);
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_COMMIT_QUEUE_H_ */
|
|
@ -0,0 +1,100 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_FS_H_
|
||||||
|
#define _TD_TSDB_FS_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t fsversion; // file system version, related to program
|
||||||
|
int64_t version;
|
||||||
|
int64_t totalPoints;
|
||||||
|
int64_t totalStorage;
|
||||||
|
} STsdbFSMeta;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int64_t version;
|
||||||
|
STsdbFSMeta meta;
|
||||||
|
SMFile mf; // meta file
|
||||||
|
SArray* df; // data file array
|
||||||
|
} SFSVer;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pthread_rwlock_t lock;
|
||||||
|
|
||||||
|
SFSVer fsv;
|
||||||
|
} STsdbFS;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int version; // current FS version
|
||||||
|
int index;
|
||||||
|
int fid;
|
||||||
|
SDFileSet* pSet;
|
||||||
|
} SFSIter;
|
||||||
|
|
||||||
|
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
||||||
|
#define TSDB_FILE_F(tf) (&((tf)->f)))
|
||||||
|
#define TSDB_FILE_FD(tf) ((tf)->fd)
|
||||||
|
|
||||||
|
int tsdbOpenFS(STsdbRepo* pRepo);
|
||||||
|
void tsdbCloseFS(STsdbRepo* pRepo);
|
||||||
|
int tsdbFSNewTxn(STsdbRepo* pRepo);
|
||||||
|
int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError);
|
||||||
|
int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile);
|
||||||
|
int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
|
||||||
|
void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid);
|
||||||
|
int tsdbRemoveDFileSet(SDFileSet* pSet);
|
||||||
|
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
|
||||||
|
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
|
||||||
|
SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to);
|
||||||
|
int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter);
|
||||||
|
SDFileSet* tsdbFSIterNext(SFSIter* pIter);
|
||||||
|
int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet);
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
|
||||||
|
int code = pthread_rwlock_rdlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
|
||||||
|
int code = pthread_rwlock_wrlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
|
||||||
|
int code = pthread_rwlock_unlock(&(pFs->lock));
|
||||||
|
if (code != 0) {
|
||||||
|
terrno = TAOS_SYSTEM_ERROR(code);
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_FS_H_ */
|
|
@ -0,0 +1,111 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TS_TSDB_FILE_H_
|
||||||
|
#define _TS_TSDB_FILE_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define TSDB_FILE_HEAD_SIZE 512
|
||||||
|
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||||
|
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
||||||
|
|
||||||
|
typedef enum {
|
||||||
|
TSDB_FILE_HEAD = 0,
|
||||||
|
TSDB_FILE_DATA,
|
||||||
|
TSDB_FILE_LAST,
|
||||||
|
TSDB_FILE_MAX,
|
||||||
|
TSDB_FILE_META,
|
||||||
|
TSDB_FILE_MANIFEST
|
||||||
|
} TSDB_FILE_T;
|
||||||
|
|
||||||
|
// For meta file
|
||||||
|
typedef struct {
|
||||||
|
int64_t size;
|
||||||
|
int64_t tombSize;
|
||||||
|
int64_t nRecords;
|
||||||
|
int64_t nDels;
|
||||||
|
uint32_t magic;
|
||||||
|
} SMFInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SMFInfo info;
|
||||||
|
TFILE f;
|
||||||
|
int fd;
|
||||||
|
} SMFile;
|
||||||
|
|
||||||
|
void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo);
|
||||||
|
int tsdbOpenMFile(SMFile* pMFile, int flags);
|
||||||
|
void tsdbCloseMFile(SMFile* pMFile);
|
||||||
|
int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence);
|
||||||
|
int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte);
|
||||||
|
int64_t tsdbTellMFile(SMFile *pMFile);
|
||||||
|
int tsdbEncodeMFile(void** buf, SMFile* pMFile);
|
||||||
|
void* tsdbDecodeMFile(void* buf, SMFile* pMFile);
|
||||||
|
|
||||||
|
// For .head/.data/.last file
|
||||||
|
typedef struct {
|
||||||
|
uint32_t magic;
|
||||||
|
uint32_t len;
|
||||||
|
uint32_t totalBlocks;
|
||||||
|
uint32_t totalSubBlocks;
|
||||||
|
uint32_t offset;
|
||||||
|
uint64_t size;
|
||||||
|
uint64_t tombSize;
|
||||||
|
} SDFInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
SDFInfo info;
|
||||||
|
TFILE f;
|
||||||
|
int fd;
|
||||||
|
} SDFile;
|
||||||
|
|
||||||
|
void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo,
|
||||||
|
TSDB_FILE_T ftype);
|
||||||
|
void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile);
|
||||||
|
int tsdbOpenDFile(SDFile* pDFile, int flags);
|
||||||
|
void tsdbCloseDFile(SDFile* pDFile);
|
||||||
|
int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence);
|
||||||
|
int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte);
|
||||||
|
int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset);
|
||||||
|
int64_t tsdbTellDFile(SDFile* pDFile);
|
||||||
|
int tsdbEncodeDFile(void** buf, SDFile* pDFile);
|
||||||
|
void* tsdbDecodeDFile(void* buf, SDFile* pDFile);
|
||||||
|
void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm);
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int fid;
|
||||||
|
int state;
|
||||||
|
SDFile files[TSDB_FILE_MAX];
|
||||||
|
} SDFileSet;
|
||||||
|
|
||||||
|
#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(&((f)->f))
|
||||||
|
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
|
||||||
|
|
||||||
|
void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id);
|
||||||
|
void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet);
|
||||||
|
int tsdbOpenDFileSet(SDFileSet* pSet, int flags);
|
||||||
|
void tsdbCloseDFileSet(SDFileSet* pSet);
|
||||||
|
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
|
||||||
|
int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet);
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TS_TSDB_FILE_H_ */
|
|
@ -0,0 +1,36 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_LOG_H_
|
||||||
|
#define _TD_TSDB_LOG_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
extern int32_t tsdbDebugFlag;
|
||||||
|
|
||||||
|
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_LOG_H_ */
|
|
@ -36,14 +36,6 @@ extern "C" {
|
||||||
typedef struct STsdbRepo STsdbRepo;
|
typedef struct STsdbRepo STsdbRepo;
|
||||||
|
|
||||||
// ================= tsdbLog.h
|
// ================= tsdbLog.h
|
||||||
extern int32_t tsdbDebugFlag;
|
|
||||||
|
|
||||||
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
|
||||||
|
|
||||||
// ================= OTHERS
|
// ================= OTHERS
|
||||||
|
|
||||||
|
@ -55,404 +47,13 @@ extern int32_t tsdbDebugFlag;
|
||||||
|
|
||||||
// Definitions
|
// Definitions
|
||||||
// ================= tsdbMeta.c
|
// ================= tsdbMeta.c
|
||||||
#define TSDB_MAX_TABLE_SCHEMAS 16
|
|
||||||
|
|
||||||
typedef struct STable {
|
|
||||||
STableId tableId;
|
|
||||||
ETableType type;
|
|
||||||
tstr* name; // NOTE: there a flexible string here
|
|
||||||
uint64_t suid;
|
|
||||||
struct STable* pSuper; // super table pointer
|
|
||||||
uint8_t numOfSchemas;
|
|
||||||
STSchema* schema[TSDB_MAX_TABLE_SCHEMAS];
|
|
||||||
STSchema* tagSchema;
|
|
||||||
SKVRow tagVal;
|
|
||||||
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
|
||||||
void* eventHandler; // TODO
|
|
||||||
void* streamHandler; // TODO
|
|
||||||
TSKEY lastKey;
|
|
||||||
SDataRow lastRow;
|
|
||||||
char* sql;
|
|
||||||
void* cqhandle;
|
|
||||||
SRWLatch latch; // TODO: implementa latch functions
|
|
||||||
T_REF_DECLARE()
|
|
||||||
} STable;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
pthread_rwlock_t rwLock;
|
|
||||||
|
|
||||||
int32_t nTables;
|
|
||||||
int32_t maxTables;
|
|
||||||
STable** tables;
|
|
||||||
SList* superList;
|
|
||||||
SHashObj* uidMap;
|
|
||||||
SKVStore* pStore;
|
|
||||||
int maxRowBytes;
|
|
||||||
int maxCols;
|
|
||||||
} STsdbMeta;
|
|
||||||
|
|
||||||
#define TSDB_INIT_NTABLES 1024
|
|
||||||
#define TABLE_TYPE(t) (t)->type
|
|
||||||
#define TABLE_NAME(t) (t)->name
|
|
||||||
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
|
|
||||||
#define TABLE_UID(t) (t)->tableId.uid
|
|
||||||
#define TABLE_TID(t) (t)->tableId.tid
|
|
||||||
#define TABLE_SUID(t) (t)->suid
|
|
||||||
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
|
|
||||||
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
|
|
||||||
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
|
|
||||||
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
|
|
||||||
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
|
|
||||||
|
|
||||||
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
|
|
||||||
void tsdbFreeMeta(STsdbMeta* pMeta);
|
|
||||||
int tsdbOpenMeta(STsdbRepo* pRepo);
|
|
||||||
int tsdbCloseMeta(STsdbRepo* pRepo);
|
|
||||||
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
|
|
||||||
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
|
|
||||||
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
|
|
||||||
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
|
|
||||||
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
|
|
||||||
void tsdbRefTable(STable* pTable);
|
|
||||||
void tsdbUnRefTable(STable* pTable);
|
|
||||||
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
|
||||||
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
|
||||||
return -1;
|
|
||||||
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
|
|
||||||
return 1;
|
|
||||||
} else {
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) {
|
|
||||||
STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
|
||||||
STSchema* pSchema = NULL;
|
|
||||||
STSchema* pTSchema = NULL;
|
|
||||||
|
|
||||||
if (lock) TSDB_RLOCK_TABLE(pDTable);
|
|
||||||
if (version < 0) { // get the latest version of schema
|
|
||||||
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
|
|
||||||
} else { // get the schema with version
|
|
||||||
void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*),
|
|
||||||
tsdbCompareSchemaVersion, TD_EQ);
|
|
||||||
if (ptr == NULL) {
|
|
||||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
|
||||||
goto _exit;
|
|
||||||
}
|
|
||||||
pTSchema = *(STSchema**)ptr;
|
|
||||||
}
|
|
||||||
|
|
||||||
ASSERT(pTSchema != NULL);
|
|
||||||
|
|
||||||
if (copy) {
|
|
||||||
if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
|
||||||
} else {
|
|
||||||
pSchema = pTSchema;
|
|
||||||
}
|
|
||||||
|
|
||||||
_exit:
|
|
||||||
if (lock) TSDB_RUNLOCK_TABLE(pDTable);
|
|
||||||
return pSchema;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
|
|
||||||
return tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
|
||||||
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
|
|
||||||
STable *pSuper = pTable->pSuper;
|
|
||||||
if (pSuper == NULL) return NULL;
|
|
||||||
return pSuper->tagSchema;
|
|
||||||
} else if (pTable->type == TSDB_SUPER_TABLE) {
|
|
||||||
return pTable->tagSchema;
|
|
||||||
} else {
|
|
||||||
return NULL;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
|
|
||||||
ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow));
|
|
||||||
return pTable->lastKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ================= tsdbBuffer.c
|
// ================= tsdbBuffer.c
|
||||||
typedef struct {
|
|
||||||
int64_t blockId;
|
|
||||||
int offset;
|
|
||||||
int remain;
|
|
||||||
char data[];
|
|
||||||
} STsdbBufBlock;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
pthread_cond_t poolNotEmpty;
|
|
||||||
int bufBlockSize;
|
|
||||||
int tBufBlocks;
|
|
||||||
int nBufBlocks;
|
|
||||||
int64_t index;
|
|
||||||
SList* bufBlockList;
|
|
||||||
} STsdbBufPool;
|
|
||||||
|
|
||||||
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
|
||||||
|
|
||||||
STsdbBufPool* tsdbNewBufPool();
|
|
||||||
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
|
||||||
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
|
||||||
void tsdbCloseBufPool(STsdbRepo* pRepo);
|
|
||||||
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
|
||||||
|
|
||||||
// ------------------ tsdbMemTable.c
|
// ------------------ tsdbMemTable.c
|
||||||
typedef struct {
|
|
||||||
int rowsInserted;
|
|
||||||
int rowsUpdated;
|
|
||||||
int rowsDeleteSucceed;
|
|
||||||
int rowsDeleteFailed;
|
|
||||||
int nOperations;
|
|
||||||
TSKEY keyFirst;
|
|
||||||
TSKEY keyLast;
|
|
||||||
} SMergeInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
STable * pTable;
|
|
||||||
SSkipListIterator *pIter;
|
|
||||||
} SCommitIter;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
uint64_t uid;
|
|
||||||
TSKEY keyFirst;
|
|
||||||
TSKEY keyLast;
|
|
||||||
int64_t numOfRows;
|
|
||||||
SSkipList* pData;
|
|
||||||
} STableData;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
T_REF_DECLARE()
|
|
||||||
SRWLatch latch;
|
|
||||||
TSKEY keyFirst;
|
|
||||||
TSKEY keyLast;
|
|
||||||
int64_t numOfRows;
|
|
||||||
int32_t maxTables;
|
|
||||||
STableData** tData;
|
|
||||||
SList* actList;
|
|
||||||
SList* extraBuffList;
|
|
||||||
SList* bufBlockList;
|
|
||||||
} SMemTable;
|
|
||||||
|
|
||||||
enum { TSDB_UPDATE_META, TSDB_DROP_META };
|
|
||||||
|
|
||||||
#ifdef WINDOWS
|
|
||||||
#pragma pack(push ,1)
|
|
||||||
typedef struct {
|
|
||||||
#else
|
|
||||||
typedef struct __attribute__((packed)){
|
|
||||||
#endif
|
|
||||||
char act;
|
|
||||||
uint64_t uid;
|
|
||||||
} SActObj;
|
|
||||||
#ifdef WINDOWS
|
|
||||||
#pragma pack(pop)
|
|
||||||
#endif
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int len;
|
|
||||||
char cont[];
|
|
||||||
} SActCont;
|
|
||||||
|
|
||||||
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
|
||||||
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
|
||||||
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
|
||||||
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
|
|
||||||
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
|
||||||
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
|
||||||
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
|
||||||
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
|
|
||||||
void* tsdbCommitData(STsdbRepo* pRepo);
|
|
||||||
|
|
||||||
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
|
||||||
if (pIter == NULL) return NULL;
|
|
||||||
|
|
||||||
SSkipListNode* node = tSkipListIterGet(pIter);
|
|
||||||
if (node == NULL) return NULL;
|
|
||||||
|
|
||||||
return (SDataRow)SL_GET_NODE_DATA(node);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
|
||||||
SDataRow row = tsdbNextIterRow(pIter);
|
|
||||||
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
|
|
||||||
|
|
||||||
return dataRowKey(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
|
|
||||||
SDataRow row = tsdbNextIterRow(pIter);
|
|
||||||
if (row == NULL) return TKEY_NULL;
|
|
||||||
|
|
||||||
return dataRowTKey(row);
|
|
||||||
}
|
|
||||||
|
|
||||||
// ================= tsdbFile.c
|
// ================= tsdbFile.c
|
||||||
#define TSDB_FILE_HEAD_SIZE 512
|
|
||||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
|
||||||
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
|
||||||
|
|
||||||
typedef enum {
|
|
||||||
TSDB_FILE_HEAD = 0,
|
|
||||||
TSDB_FILE_DATA,
|
|
||||||
TSDB_FILE_LAST,
|
|
||||||
TSDB_FILE_MAX,
|
|
||||||
TSDB_FILE_META,
|
|
||||||
TSDB_FILE_MANIFEST
|
|
||||||
} TSDB_FILE_T;
|
|
||||||
|
|
||||||
// For meta file
|
|
||||||
typedef struct {
|
|
||||||
int64_t size;
|
|
||||||
int64_t tombSize;
|
|
||||||
int64_t nRecords;
|
|
||||||
int64_t nDels;
|
|
||||||
uint32_t magic;
|
|
||||||
} SMFInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SMFInfo info;
|
|
||||||
TFILE f;
|
|
||||||
int fd;
|
|
||||||
} SMFile;
|
|
||||||
|
|
||||||
void tsdbInitMFile(SMFile* pMFile, int vid, int ver, SMFInfo* pInfo);
|
|
||||||
int tsdbOpenMFile(SMFile* pMFile, int flags);
|
|
||||||
void tsdbCloseMFile(SMFile* pMFile);
|
|
||||||
int64_t tsdbSeekMFile(SMFile* pMFile, int64_t offset, int whence);
|
|
||||||
int64_t tsdbWriteMFile(SMFile* pMFile, void* buf, int64_t nbyte);
|
|
||||||
int64_t tsdbTellMFile(SMFile *pMFile);
|
|
||||||
int tsdbEncodeMFile(void** buf, SMFile* pMFile);
|
|
||||||
void* tsdbDecodeMFile(void* buf, SMFile* pMFile);
|
|
||||||
|
|
||||||
// For .head/.data/.last file
|
|
||||||
typedef struct {
|
|
||||||
uint32_t magic;
|
|
||||||
uint32_t len;
|
|
||||||
uint32_t totalBlocks;
|
|
||||||
uint32_t totalSubBlocks;
|
|
||||||
uint32_t offset;
|
|
||||||
uint64_t size;
|
|
||||||
uint64_t tombSize;
|
|
||||||
} SDFInfo;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
SDFInfo info;
|
|
||||||
TFILE f;
|
|
||||||
int fd;
|
|
||||||
} SDFile;
|
|
||||||
|
|
||||||
void tsdbInitDFile(SDFile* pDFile, int vid, int fid, int ver, int level, int id, const SDFInfo* pInfo,
|
|
||||||
TSDB_FILE_T ftype);
|
|
||||||
void tsdbInitDFileWithOld(SDFile* pDFile, SDFile* pOldDFile);
|
|
||||||
int tsdbOpenDFile(SDFile* pDFile, int flags);
|
|
||||||
void tsdbCloseDFile(SDFile* pDFile);
|
|
||||||
int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence);
|
|
||||||
int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte);
|
|
||||||
int64_t tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset);
|
|
||||||
int64_t tsdbTellDFile(SDFile* pDFile);
|
|
||||||
int tsdbEncodeDFile(void** buf, SDFile* pDFile);
|
|
||||||
void* tsdbDecodeDFile(void* buf, SDFile* pDFile);
|
|
||||||
void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm);
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int fid;
|
|
||||||
int state;
|
|
||||||
SDFile files[TSDB_FILE_MAX];
|
|
||||||
} SDFileSet;
|
|
||||||
|
|
||||||
#define TSDB_FILE_FULL_NAME(f) TFILE_NAME(&((f)->f))
|
|
||||||
#define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t))
|
|
||||||
|
|
||||||
void tsdbInitDFileSet(SDFileSet* pSet, int vid, int fid, int ver, int level, int id);
|
|
||||||
void tsdbInitDFileSetWithOld(SDFileSet *pSet, SDFileSet *pOldSet);
|
|
||||||
int tsdbOpenDFileSet(SDFileSet* pSet, int flags);
|
|
||||||
void tsdbCloseDFileSet(SDFileSet* pSet);
|
|
||||||
int tsdbUpdateDFileSetHeader(SDFileSet* pSet);
|
|
||||||
int tsdbCopyDFileSet(SDFileSet* pFromSet, SDFileSet* pToSet);
|
|
||||||
|
|
||||||
/* Statistic information of the TSDB file system.
|
/* Statistic information of the TSDB file system.
|
||||||
*/
|
*/
|
||||||
typedef struct {
|
|
||||||
int64_t fsversion; // file system version, related to program
|
|
||||||
int64_t version;
|
|
||||||
int64_t totalPoints;
|
|
||||||
int64_t totalStorage;
|
|
||||||
} STsdbFSMeta;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int64_t version;
|
|
||||||
STsdbFSMeta meta;
|
|
||||||
SMFile mf; // meta file
|
|
||||||
SArray* df; // data file array
|
|
||||||
} SFSVer;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
pthread_rwlock_t lock;
|
|
||||||
|
|
||||||
SFSVer fsv;
|
|
||||||
} STsdbFS;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int version; // current FS version
|
|
||||||
int index;
|
|
||||||
int fid;
|
|
||||||
SDFileSet* pSet;
|
|
||||||
} SFSIter;
|
|
||||||
|
|
||||||
#define TSDB_FILE_INFO(tf) (&((tf)->info))
|
|
||||||
#define TSDB_FILE_F(tf) (&((tf)->f)))
|
|
||||||
#define TSDB_FILE_FD(tf) ((tf)->fd)
|
|
||||||
|
|
||||||
int tsdbOpenFS(STsdbRepo* pRepo);
|
|
||||||
void tsdbCloseFS(STsdbRepo* pRepo);
|
|
||||||
int tsdbFSNewTxn(STsdbRepo* pRepo);
|
|
||||||
int tsdbFSEndTxn(STsdbRepo* pRepo, bool hasError);
|
|
||||||
int tsdbUpdateMFile(STsdbRepo* pRepo, SMFile* pMFile);
|
|
||||||
int tsdbUpdateDFileSet(STsdbRepo* pRepo, SDFileSet* pSet);
|
|
||||||
void tsdbRemoveExpiredDFileSet(STsdbRepo* pRepo, int mfid);
|
|
||||||
int tsdbRemoveDFileSet(SDFileSet* pSet);
|
|
||||||
int tsdbEncodeMFInfo(void** buf, SMFInfo* pInfo);
|
|
||||||
void* tsdbDecodeMFInfo(void* buf, SMFInfo* pInfo);
|
|
||||||
SDFileSet tsdbMoveDFileSet(SDFileSet* pOldSet, int to);
|
|
||||||
int tsdbInitFSIter(STsdbRepo* pRepo, SFSIter* pIter);
|
|
||||||
SDFileSet* tsdbFSIterNext(SFSIter* pIter);
|
|
||||||
int tsdbCreateDFileSet(int fid, int level, SDFileSet* pSet);
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbRLockFS(STsdbFS *pFs) {
|
|
||||||
int code = pthread_rwlock_rdlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbWLockFS(STsdbFS *pFs) {
|
|
||||||
int code = pthread_rwlock_wrlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
|
|
||||||
int code = pthread_rwlock_unlock(&(pFs->lock));
|
|
||||||
if (code != 0) {
|
|
||||||
terrno = TAOS_SYSTEM_ERROR(code);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
// ================= tsdbStore.c
|
// ================= tsdbStore.c
|
||||||
#define KVSTORE_FILE_VERSION ((uint32_t)0)
|
#define KVSTORE_FILE_VERSION ((uint32_t)0)
|
||||||
|
|
||||||
|
@ -577,63 +178,7 @@ void tsdbGetFidKeyRange(int daysPerFile, int8_t precision, int fileId, TS
|
||||||
// int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
|
// int tsdbApplyRetention(STsdbRepo* pRepo, SFidGroup *pFidGroup);
|
||||||
|
|
||||||
// ================= tsdbMain.c
|
// ================= tsdbMain.c
|
||||||
typedef struct {
|
|
||||||
int32_t totalLen;
|
|
||||||
int32_t len;
|
|
||||||
SDataRow row;
|
|
||||||
} SSubmitBlkIter;
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
int32_t totalLen;
|
|
||||||
int32_t len;
|
|
||||||
void * pMsg;
|
|
||||||
} SSubmitMsgIter;
|
|
||||||
|
|
||||||
struct STsdbRepo {
|
|
||||||
int8_t state;
|
|
||||||
|
|
||||||
char* rootDir;
|
|
||||||
STsdbCfg config;
|
|
||||||
STsdbAppH appH;
|
|
||||||
STsdbStat stat;
|
|
||||||
STsdbMeta* tsdbMeta;
|
|
||||||
STsdbBufPool* pPool;
|
|
||||||
SMemTable* mem;
|
|
||||||
SMemTable* imem;
|
|
||||||
STsdbFS* fs;
|
|
||||||
sem_t readyToCommit;
|
|
||||||
pthread_mutex_t mutex;
|
|
||||||
bool repoLocked;
|
|
||||||
int32_t code; // Commit code
|
|
||||||
};
|
|
||||||
|
|
||||||
#define REPO_ID(r) (r)->config.tsdbId
|
|
||||||
#define REPO_CFG(r) (&((r)->config))
|
|
||||||
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
|
||||||
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
|
|
||||||
|
|
||||||
char* tsdbGetMetaFileName(char* rootDir);
|
|
||||||
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname);
|
|
||||||
int tsdbLockRepo(STsdbRepo* pRepo);
|
|
||||||
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
|
||||||
char* tsdbGetDataDirName(char* rootDir);
|
|
||||||
int tsdbGetNextMaxTables(int tid);
|
|
||||||
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
|
|
||||||
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
|
|
||||||
int tsdbCheckCommit(STsdbRepo* pRepo);
|
|
||||||
|
|
||||||
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
|
|
||||||
ASSERT(pRepo != NULL);
|
|
||||||
if (pRepo->mem == NULL) return NULL;
|
|
||||||
|
|
||||||
SListNode* pNode = listTail(pRepo->mem->bufBlockList);
|
|
||||||
if (pNode == NULL) return NULL;
|
|
||||||
|
|
||||||
STsdbBufBlock* pBufBlock = NULL;
|
|
||||||
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
|
|
||||||
|
|
||||||
return pBufBlock;
|
|
||||||
}
|
|
||||||
|
|
||||||
#include "tsdbReadImpl.h"
|
#include "tsdbReadImpl.h"
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,117 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_MEMTABLE_H_
|
||||||
|
#define _TD_TSDB_MEMTABLE_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int rowsInserted;
|
||||||
|
int rowsUpdated;
|
||||||
|
int rowsDeleteSucceed;
|
||||||
|
int rowsDeleteFailed;
|
||||||
|
int nOperations;
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
} SMergeInfo;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
STable * pTable;
|
||||||
|
SSkipListIterator *pIter;
|
||||||
|
} SCommitIter;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
uint64_t uid;
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
int64_t numOfRows;
|
||||||
|
SSkipList* pData;
|
||||||
|
} STableData;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
T_REF_DECLARE()
|
||||||
|
SRWLatch latch;
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
int64_t numOfRows;
|
||||||
|
int32_t maxTables;
|
||||||
|
STableData** tData;
|
||||||
|
SList* actList;
|
||||||
|
SList* extraBuffList;
|
||||||
|
SList* bufBlockList;
|
||||||
|
} SMemTable;
|
||||||
|
|
||||||
|
enum { TSDB_UPDATE_META, TSDB_DROP_META };
|
||||||
|
|
||||||
|
#ifdef WINDOWS
|
||||||
|
#pragma pack(push ,1)
|
||||||
|
typedef struct {
|
||||||
|
#else
|
||||||
|
typedef struct __attribute__((packed)){
|
||||||
|
#endif
|
||||||
|
char act;
|
||||||
|
uint64_t uid;
|
||||||
|
} SActObj;
|
||||||
|
#ifdef WINDOWS
|
||||||
|
#pragma pack(pop)
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int len;
|
||||||
|
char cont[];
|
||||||
|
} SActCont;
|
||||||
|
|
||||||
|
int tsdbRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||||
|
int tsdbUnRefMemTable(STsdbRepo* pRepo, SMemTable* pMemTable);
|
||||||
|
int tsdbTakeMemSnapshot(STsdbRepo* pRepo, SMemTable** pMem, SMemTable** pIMem);
|
||||||
|
void tsdbUnTakeMemSnapShot(STsdbRepo* pRepo, SMemTable* pMem, SMemTable* pIMem);
|
||||||
|
void* tsdbAllocBytes(STsdbRepo* pRepo, int bytes);
|
||||||
|
int tsdbAsyncCommit(STsdbRepo* pRepo);
|
||||||
|
int tsdbLoadDataFromCache(STable* pTable, SSkipListIterator* pIter, TSKEY maxKey, int maxRowsToRead, SDataCols* pCols,
|
||||||
|
TKEY* filterKeys, int nFilterKeys, bool keepDup, SMergeInfo* pMergeInfo);
|
||||||
|
void* tsdbCommitData(STsdbRepo* pRepo);
|
||||||
|
|
||||||
|
static FORCE_INLINE SDataRow tsdbNextIterRow(SSkipListIterator* pIter) {
|
||||||
|
if (pIter == NULL) return NULL;
|
||||||
|
|
||||||
|
SSkipListNode* node = tSkipListIterGet(pIter);
|
||||||
|
if (node == NULL) return NULL;
|
||||||
|
|
||||||
|
return (SDataRow)SL_GET_NODE_DATA(node);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator* pIter) {
|
||||||
|
SDataRow row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL) return TSDB_DATA_TIMESTAMP_NULL;
|
||||||
|
|
||||||
|
return dataRowKey(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE TKEY tsdbNextIterTKey(SSkipListIterator* pIter) {
|
||||||
|
SDataRow row = tsdbNextIterRow(pIter);
|
||||||
|
if (row == NULL) return TKEY_NULL;
|
||||||
|
|
||||||
|
return dataRowTKey(row);
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_MEMTABLE_H_ */
|
|
@ -0,0 +1,151 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_META_H_
|
||||||
|
#define _TD_TSDB_META_H_
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#define TSDB_MAX_TABLE_SCHEMAS 16
|
||||||
|
|
||||||
|
typedef struct STable {
|
||||||
|
STableId tableId;
|
||||||
|
ETableType type;
|
||||||
|
tstr* name; // NOTE: there a flexible string here
|
||||||
|
uint64_t suid;
|
||||||
|
struct STable* pSuper; // super table pointer
|
||||||
|
uint8_t numOfSchemas;
|
||||||
|
STSchema* schema[TSDB_MAX_TABLE_SCHEMAS];
|
||||||
|
STSchema* tagSchema;
|
||||||
|
SKVRow tagVal;
|
||||||
|
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
||||||
|
void* eventHandler; // TODO
|
||||||
|
void* streamHandler; // TODO
|
||||||
|
TSKEY lastKey;
|
||||||
|
SDataRow lastRow;
|
||||||
|
char* sql;
|
||||||
|
void* cqhandle;
|
||||||
|
SRWLatch latch; // TODO: implementa latch functions
|
||||||
|
T_REF_DECLARE()
|
||||||
|
} STable;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
pthread_rwlock_t rwLock;
|
||||||
|
|
||||||
|
int32_t nTables;
|
||||||
|
int32_t maxTables;
|
||||||
|
STable** tables;
|
||||||
|
SList* superList;
|
||||||
|
SHashObj* uidMap;
|
||||||
|
SKVStore* pStore;
|
||||||
|
int maxRowBytes;
|
||||||
|
int maxCols;
|
||||||
|
} STsdbMeta;
|
||||||
|
|
||||||
|
#define TSDB_INIT_NTABLES 1024
|
||||||
|
#define TABLE_TYPE(t) (t)->type
|
||||||
|
#define TABLE_NAME(t) (t)->name
|
||||||
|
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
|
||||||
|
#define TABLE_UID(t) (t)->tableId.uid
|
||||||
|
#define TABLE_TID(t) (t)->tableId.tid
|
||||||
|
#define TABLE_SUID(t) (t)->suid
|
||||||
|
#define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
|
||||||
|
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
|
||||||
|
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
|
||||||
|
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
|
||||||
|
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
|
||||||
|
|
||||||
|
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
|
||||||
|
void tsdbFreeMeta(STsdbMeta* pMeta);
|
||||||
|
int tsdbOpenMeta(STsdbRepo* pRepo);
|
||||||
|
int tsdbCloseMeta(STsdbRepo* pRepo);
|
||||||
|
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
|
||||||
|
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t version);
|
||||||
|
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
|
||||||
|
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
|
||||||
|
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
|
||||||
|
void tsdbRefTable(STable* pTable);
|
||||||
|
void tsdbUnRefTable(STable* pTable);
|
||||||
|
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
|
||||||
|
|
||||||
|
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||||
|
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||||
|
return -1;
|
||||||
|
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
|
||||||
|
return 1;
|
||||||
|
} else {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t version) {
|
||||||
|
STable* pDTable = (TABLE_TYPE(pTable) == TSDB_CHILD_TABLE) ? pTable->pSuper : pTable;
|
||||||
|
STSchema* pSchema = NULL;
|
||||||
|
STSchema* pTSchema = NULL;
|
||||||
|
|
||||||
|
if (lock) TSDB_RLOCK_TABLE(pDTable);
|
||||||
|
if (version < 0) { // get the latest version of schema
|
||||||
|
pTSchema = pDTable->schema[pDTable->numOfSchemas - 1];
|
||||||
|
} else { // get the schema with version
|
||||||
|
void* ptr = taosbsearch(&version, pDTable->schema, pDTable->numOfSchemas, sizeof(STSchema*),
|
||||||
|
tsdbCompareSchemaVersion, TD_EQ);
|
||||||
|
if (ptr == NULL) {
|
||||||
|
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||||
|
goto _exit;
|
||||||
|
}
|
||||||
|
pTSchema = *(STSchema**)ptr;
|
||||||
|
}
|
||||||
|
|
||||||
|
ASSERT(pTSchema != NULL);
|
||||||
|
|
||||||
|
if (copy) {
|
||||||
|
if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||||
|
} else {
|
||||||
|
pSchema = pTSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
_exit:
|
||||||
|
if (lock) TSDB_RUNLOCK_TABLE(pDTable);
|
||||||
|
return pSchema;
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
|
||||||
|
return tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
||||||
|
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
|
||||||
|
STable *pSuper = pTable->pSuper;
|
||||||
|
if (pSuper == NULL) return NULL;
|
||||||
|
return pSuper->tagSchema;
|
||||||
|
} else if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
|
return pTable->tagSchema;
|
||||||
|
} else {
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
|
||||||
|
ASSERT(pTable->lastRow == NULL || pTable->lastKey == dataRowKey(pTable->lastRow));
|
||||||
|
return pTable->lastKey;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_META_H_ */
|
|
@ -0,0 +1,106 @@
|
||||||
|
/*
|
||||||
|
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||||
|
*
|
||||||
|
* This program is free software: you can use, redistribute, and/or modify
|
||||||
|
* it under the terms of the GNU Affero General Public License, version 3
|
||||||
|
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||||
|
*
|
||||||
|
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||||
|
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||||
|
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||||
|
*
|
||||||
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
|
*/
|
||||||
|
|
||||||
|
#ifndef _TD_TSDB_INT_H_
|
||||||
|
#define _TD_TSDB_INT_H_
|
||||||
|
|
||||||
|
#include "os.h"
|
||||||
|
#include "tlog.h"
|
||||||
|
#include "taosdef.h"
|
||||||
|
#include "tskiplist.h"
|
||||||
|
#include "tdataformat.h"
|
||||||
|
#include "tlockfree.h"
|
||||||
|
#include "tlist.h"
|
||||||
|
#include "hash.h"
|
||||||
|
#include "tarray.h"
|
||||||
|
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
extern "C" {
|
||||||
|
#endif
|
||||||
|
|
||||||
|
typedef struct STsdbRepo STsdbRepo;
|
||||||
|
|
||||||
|
// Log
|
||||||
|
#include "tsdbLog.h"
|
||||||
|
// Meta
|
||||||
|
#include "tsdbMeta.h"
|
||||||
|
// Buffer
|
||||||
|
#include "tsdbBuffer.h"
|
||||||
|
// MemTable
|
||||||
|
#include "tsdbMemTable.h"
|
||||||
|
// File
|
||||||
|
#include "tsdbFile.h"
|
||||||
|
// FS
|
||||||
|
#include "tsdbFS.h"
|
||||||
|
// ReadImpl
|
||||||
|
#include "tsdbReadImpl.h"
|
||||||
|
// Commit
|
||||||
|
#include "tsdbCommit.h"
|
||||||
|
// Commit Queue
|
||||||
|
#include "tsdbCommitQueue.h"
|
||||||
|
// Main definitions
|
||||||
|
struct STsdbRepo {
|
||||||
|
int8_t state;
|
||||||
|
|
||||||
|
char* rootDir;
|
||||||
|
STsdbCfg config;
|
||||||
|
STsdbAppH appH;
|
||||||
|
STsdbStat stat;
|
||||||
|
STsdbMeta* tsdbMeta;
|
||||||
|
STsdbBufPool* pPool;
|
||||||
|
SMemTable* mem;
|
||||||
|
SMemTable* imem;
|
||||||
|
STsdbFS* fs;
|
||||||
|
sem_t readyToCommit;
|
||||||
|
pthread_mutex_t mutex;
|
||||||
|
bool repoLocked;
|
||||||
|
int32_t code; // Commit code
|
||||||
|
};
|
||||||
|
|
||||||
|
#define REPO_ID(r) (r)->config.tsdbId
|
||||||
|
#define REPO_CFG(r) (&((r)->config))
|
||||||
|
#define IS_REPO_LOCKED(r) (r)->repoLocked
|
||||||
|
#define TSDB_SUBMIT_MSG_HEAD_SIZE sizeof(SSubmitMsg)
|
||||||
|
|
||||||
|
char* tsdbGetMetaFileName(char* rootDir);
|
||||||
|
void tsdbGetDataFileName(char* rootDir, int vid, int fid, int type, char* fname);
|
||||||
|
int tsdbLockRepo(STsdbRepo* pRepo);
|
||||||
|
int tsdbUnlockRepo(STsdbRepo* pRepo);
|
||||||
|
char* tsdbGetDataDirName(char* rootDir);
|
||||||
|
int tsdbGetNextMaxTables(int tid);
|
||||||
|
STsdbMeta* tsdbGetMeta(TSDB_REPO_T* pRepo);
|
||||||
|
STsdbFileH* tsdbGetFile(TSDB_REPO_T* pRepo);
|
||||||
|
int tsdbCheckCommit(STsdbRepo* pRepo);
|
||||||
|
|
||||||
|
static FORCE_INLINE STsdbBufBlock* tsdbGetCurrBufBlock(STsdbRepo* pRepo) {
|
||||||
|
ASSERT(pRepo != NULL);
|
||||||
|
if (pRepo->mem == NULL) return NULL;
|
||||||
|
|
||||||
|
SListNode* pNode = listTail(pRepo->mem->bufBlockList);
|
||||||
|
if (pNode == NULL) return NULL;
|
||||||
|
|
||||||
|
STsdbBufBlock* pBufBlock = NULL;
|
||||||
|
tdListNodeGetData(pRepo->mem->bufBlockList, pNode, (void*)(&pBufBlock));
|
||||||
|
|
||||||
|
return pBufBlock;
|
||||||
|
}
|
||||||
|
|
||||||
|
#ifdef __cplusplus
|
||||||
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
|
#endif /* _TD_TSDB_INT_H_ */
|
|
@ -13,8 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "tsdb.h"
|
#include "tsdbint.h"
|
||||||
#include "tsdbMain.h"
|
|
||||||
|
|
||||||
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
|
#define POOL_IS_EMPTY(b) (listNEles((b)->bufBlockList) == 0)
|
||||||
|
|
||||||
|
|
|
@ -19,6 +19,18 @@
|
||||||
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
#define TSDB_DATA_SKIPLIST_LEVEL 5
|
||||||
#define TSDB_MAX_INSERT_BATCH 512
|
#define TSDB_MAX_INSERT_BATCH 512
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t totalLen;
|
||||||
|
int32_t len;
|
||||||
|
SDataRow row;
|
||||||
|
} SSubmitBlkIter;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
int32_t totalLen;
|
||||||
|
int32_t len;
|
||||||
|
void * pMsg;
|
||||||
|
} SSubmitMsgIter;
|
||||||
|
|
||||||
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
static SMemTable * tsdbNewMemTable(STsdbRepo *pRepo);
|
||||||
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
static void tsdbFreeMemTable(SMemTable *pMemTable);
|
||||||
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
|
static STableData *tsdbNewTableData(STsdbCfg *pCfg, STable *pTable);
|
||||||
|
|
Loading…
Reference in New Issue