This commit is contained in:
Hongze Cheng 2022-01-06 08:47:26 +00:00
parent cb96d68596
commit 3a2975f157
2 changed files with 445 additions and 17 deletions

View File

@ -0,0 +1,437 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_H_
#define _TD_TSDB_H_
#include <pthread.h>
#include <stdbool.h>
#include <stdint.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tarray.h"
#include "tdataformat.h"
#include "tname.h"
#include "hash.h"
#include "tlockfree.h"
#include "tlist.h"
#ifdef __cplusplus
extern "C" {
#endif
#define TSDB_VERSION_MAJOR 1
#define TSDB_VERSION_MINOR 0
#define TSDB_INVALID_SUPER_TABLE_ID -1
#define TSDB_STATUS_COMMIT_START 1
#define TSDB_STATUS_COMMIT_OVER 2
#define TSDB_STATUS_COMMIT_NOBLOCK 3 //commit no block, need to be solved
// TSDB STATE DEFINITION
#define TSDB_STATE_OK 0x0
#define TSDB_STATE_BAD_META 0x1
#define TSDB_STATE_BAD_DATA 0x2
// --------- TSDB APPLICATION HANDLE DEFINITION
typedef struct {
void *appH;
void *cqH;
int (*notifyStatus)(void *, int status, int eno);
int (*eventCallBack)(void *);
void *(*cqCreateFunc)(void *handle, uint64_t uid, int32_t sid, const char *dstTable, char *sqlStr, STSchema *pSchema, int start);
void (*cqDropFunc)(void *handle);
} STsdbAppH;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
typedef struct {
int32_t tsdbId;
int32_t cacheBlockSize;
int32_t totalBlocks;
int32_t daysPerFile; // day per file sharding policy
int32_t keep; // day of data to keep
int32_t keep1;
int32_t keep2;
int32_t minRowsPerFileBlock; // minimum rows per file block
int32_t maxRowsPerFileBlock; // maximum rows per file block
int8_t precision;
int8_t compression;
int8_t update;
int8_t cacheLastRow; // 0:no cache, 1: cache last row, 2: cache last NULL column 3: 1&2
} STsdbCfg;
#define CACHE_NO_LAST(c) ((c)->cacheLastRow == 0)
#define CACHE_LAST_ROW(c) (((c)->cacheLastRow & 1) > 0)
#define CACHE_LAST_NULL_COLUMN(c) (((c)->cacheLastRow & 2) > 0)
// --------- TSDB REPOSITORY USAGE STATISTICS
typedef struct {
int64_t totalStorage; // total bytes occupie
int64_t compStorage;
int64_t pointsWritten; // total data points written
} STsdbStat;
typedef struct STsdbRepo STsdbRepo;
STsdbCfg *tsdbGetCfg(const STsdbRepo *repo);
// --------- TSDB REPOSITORY DEFINITION
int32_t tsdbCreateRepo(int repoid);
int32_t tsdbDropRepo(int repoid);
STsdbRepo *tsdbOpenRepo(STsdbCfg *pCfg, STsdbAppH *pAppH);
int tsdbCloseRepo(STsdbRepo *repo, int toCommit);
int32_t tsdbConfigRepo(STsdbRepo *repo, STsdbCfg *pCfg);
int tsdbGetState(STsdbRepo *repo);
int8_t tsdbGetCompactState(STsdbRepo *repo);
// --------- TSDB TABLE DEFINITION
typedef struct {
uint64_t uid; // the unique table ID
int32_t tid; // the table ID in the repository.
} STableId;
// --------- TSDB TABLE configuration
typedef struct {
ETableType type;
char * name;
STableId tableId;
int32_t sversion;
char * sname; // super table name
uint64_t superUid;
STSchema * schema;
STSchema * tagSchema;
SKVRow tagValues;
char * sql;
} STableCfg;
void tsdbClearTableCfg(STableCfg *config);
void *tsdbGetTableTagVal(const void *pTable, int32_t colId, int16_t type);
char *tsdbGetTableName(void *pTable);
#define TSDB_TABLEID(_table) ((STableId*) (_table))
#define TSDB_PREV_ROW 0x1
#define TSDB_NEXT_ROW 0x2
STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg);
int tsdbCreateTable(STsdbRepo *repo, STableCfg *pCfg);
int tsdbDropTable(STsdbRepo *pRepo, STableId tableId);
int tsdbUpdateTableTagValue(STsdbRepo *repo, SUpdateTableTagValMsg *pMsg);
uint32_t tsdbGetFileInfo(STsdbRepo *repo, char *name, uint32_t *index, uint32_t eindex, int64_t *size);
// the TSDB repository info
typedef struct STsdbRepoInfo {
STsdbCfg tsdbCfg;
uint64_t version; // version of the repository
int64_t tsdbTotalDataSize; // the original inserted data size
int64_t tsdbTotalDiskSize; // the total disk size taken by this TSDB repository
// TODO: Other informations to add
} STsdbRepoInfo;
STsdbRepoInfo *tsdbGetStatus(STsdbRepo *pRepo);
// the meter information report structure
typedef struct {
STableCfg tableCfg;
uint64_t version;
int64_t tableTotalDataSize; // In bytes
int64_t tableTotalDiskSize; // In bytes
} STableInfo;
// -- FOR INSERT DATA
/**
* Insert data to a table in a repository
* @param pRepo the TSDB repository handle
* @param pData the data to insert (will give a more specific description)
*
* @return the number of points inserted, -1 for failure and the error number is set
*/
int32_t tsdbInsertData(STsdbRepo *repo, SSubmitMsg *pMsg, SShellSubmitRspMsg *pRsp);
// -- FOR QUERY TIME SERIES DATA
typedef void *TsdbQueryHandleT; // Use void to hide implementation details
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
// query condition to build multi-table data block iterator
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int64_t offset; // skip offset put down to tsdb
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond;
typedef struct STableData STableData;
typedef struct {
T_REF_DECLARE()
SRWLatch latch;
TSKEY keyFirst;
TSKEY keyLast;
int64_t numOfRows;
int32_t maxTables;
STableData **tData;
SList * actList;
SList * extraBuffList;
SList * bufBlockList;
int64_t pointsAdd; // TODO
int64_t storageAdd; // TODO
} SMemTable;
typedef struct {
SMemTable* mem;
SMemTable* imem;
SMemTable mtable;
SMemTable* omem;
} SMemSnapshot;
typedef struct SMemRef {
int32_t ref;
SMemSnapshot snapshot;
} SMemRef;
typedef struct SDataBlockInfo {
STimeWindow window;
int32_t rows;
int32_t numOfCols;
int64_t uid;
int32_t tid;
} SDataBlockInfo;
typedef struct SFileBlockInfo {
int32_t numBlocksOfStep;
} SFileBlockInfo;
typedef struct {
void *pTable;
TSKEY lastKey;
} STableKeyInfo;
typedef struct {
uint32_t numOfTables;
SArray *pGroupList;
SHashObj *map; // speedup acquire the tableQueryInfo by table uid
} STableGroupInfo;
#define TSDB_BLOCK_DIST_STEP_ROWS 16
typedef struct {
uint16_t rowSize;
uint16_t numOfFiles;
uint32_t numOfTables;
uint64_t totalSize;
uint64_t totalRows;
int32_t maxRows;
int32_t minRows;
int32_t firstSeekTimeUs;
uint32_t numOfRowsInMemTable;
uint32_t numOfSmallBlocks;
SArray *dataBlockInfos;
} STableBlockDist;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
TsdbQueryHandleT *tsdbQueryTables(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
SMemRef *pRef);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
TsdbQueryHandleT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
SMemRef *pRef);
TsdbQueryHandleT tsdbQueryCacheLast(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, SMemRef* pMemRef);
bool isTsdbCacheLastRow(TsdbQueryHandleT* pQueryHandle);
/**
* get the queried table object list
* @param pHandle
* @return
*/
SArray *tsdbGetQueriedTableList(TsdbQueryHandleT *pHandle);
/**
* get the group list according to table id from client
* @param tsdb
* @param pCond
* @param groupList
* @param qinfo
* @return
*/
TsdbQueryHandleT tsdbQueryRowsInExternalWindow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList,
uint64_t qId, SMemRef *pRef);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(TsdbQueryHandleT* pHandle);
/**
* move to next block if exists
*
* @param pQueryHandle
* @return
*/
bool tsdbNextDataBlock(TsdbQueryHandleT pQueryHandle);
/**
* Get current data block information
*
* @param pQueryHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(TsdbQueryHandleT *pQueryHandle, SDataBlockInfo *pBlockInfo);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(TsdbQueryHandleT *pQueryHandle, SDataStatis **pBlockStatis);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pQueryHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(TsdbQueryHandleT *pQueryHandle, SArray *pColumnIdList);
/**
* Get the qualified table id for a super table according to the tag query expression.
* @param stableid. super table sid
* @param pTagCond. tag query condition
*/
int32_t tsdbQuerySTableByTagCond(STsdbRepo *tsdb, uint64_t uid, TSKEY key, const char *pTagCond, size_t len,
STableGroupInfo *pGroupList, SColIndex *pColIndex, int32_t numOfCols);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(STsdbRepo *tsdb, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdbRepo *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
void tsdbResetQueryHandle(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond);
void tsdbResetQueryHandleForNewTable(TsdbQueryHandleT queryHandle, STsdbQueryCond *pCond, STableGroupInfo* groupList);
int32_t tsdbGetFileBlocksDistInfo(TsdbQueryHandleT* queryHandle, STableBlockDist* pTableBlockInfo);
// obtain queryHandle attribute
int64_t tsdbSkipOffset(TsdbQueryHandleT queryHandle);
/**
* get the statistics of repo usage
* @param repo. point to the tsdbrepo
* @param totalPoints. total data point written
* @param totalStorage. total bytes took by the tsdb
* @param compStorage. total bytes took by the tsdb after compressed
*/
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue();
int tsdbSyncCommit(STsdbRepo *repo);
void tsdbIncCommitRef(int vgId);
void tsdbDecCommitRef(int vgId);
void tsdbSwitchTable(TsdbQueryHandleT pQueryHandle);
// For TSDB file sync
int tsdbSyncSend(void *pRepo, SOCKET socketFd);
int tsdbSyncRecv(void *pRepo, SOCKET socketFd);
// For TSDB Compact
int tsdbCompact(STsdbRepo *pRepo);
// For TSDB Health Monitor
// no problem return true
bool tsdbNoProblem(STsdbRepo* pRepo);
// unit of walSize: MB
int tsdbCheckWal(STsdbRepo *pRepo, uint32_t walSize);
// for json tag
void* getJsonTagValueElment(void* data, char* key, int32_t keyLen, char* out, int16_t bytes);
void getJsonTagValueAll(void* data, void* dst, int16_t bytes);
char* parseTagDatatoJson(void *p);
#ifdef __cplusplus
}
#endif
#endif // _TD_TSDB_H_

View File

@ -1,18 +1,9 @@
CMAKE_MINIMUM_REQUIRED(VERSION 3.0...3.20) aux_source_directory(src TSDB_SRC)
PROJECT(TDengine) add_library(tsdb STATIC ${TSDB_SRC})
INCLUDE_DIRECTORIES(inc) target_include_directories(
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/src/query/inc) tsdb
INCLUDE_DIRECTORIES(${TD_COMMUNITY_DIR}/deps/cJson/inc) PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb"
AUX_SOURCE_DIRECTORY(src SRC) PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
ADD_LIBRARY(tsdb ${SRC}) )
TARGET_LINK_LIBRARIES(tsdb tfs common tutil cJson) target_link_libraries(tsdb os util common tfs)
IF (TD_TSDB_PLUGINS)
TARGET_LINK_LIBRARIES(tsdb tsdbPlugins)
ENDIF ()
IF (TD_LINUX)
# Someone has no gtest directory, so comment it
# ADD_SUBDIRECTORY(tests)
ENDIF ()