Merge pull request #11475 from taosdata/feature/vnode_refact

refactor: vnode code refinement
This commit is contained in:
Hongze Cheng 2022-04-14 10:49:41 +08:00 committed by GitHub
commit 7323c88d00
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 298 additions and 614 deletions

View File

@ -25,82 +25,144 @@
#include "tfs.h"
#include "wal.h"
#include "tcommon.h"
#include "tfs.h"
#include "tmallocator.h"
#include "tmsg.h"
#include "trow.h"
#include "tmallocator.h"
#include "tcommon.h"
#include "tfs.h"
#ifdef __cplusplus
extern "C" {
#endif
/* ------------------------ TYPES EXPOSED ------------------------ */
typedef struct SMgmtWrapper SMgmtWrapper;
typedef struct SVnode SVnode;
// vnode
typedef struct SVnode SVnode;
typedef struct SMetaCfg SMetaCfg; // todo: remove
typedef struct STsdbCfg STsdbCfg; // todo: remove
typedef struct STqCfg STqCfg; // todo: remove
typedef struct SVnodeCfg SVnodeCfg;
int vnodeInit();
void vnodeCleanup();
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
void vnodeClose(SVnode *pVnode);
void vnodeDestroy(const char *path);
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
// meta
typedef struct SMeta SMeta; // todo: remove
typedef struct SMTbCursor SMTbCursor; // todo: remove
typedef struct SMCtbCursor SMCtbCursor; // todo: remove
typedef struct SMSmaCursor SMSmaCursor; // todo: remove
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
// Types exported
typedef struct SMeta SMeta;
typedef struct SMetaCfg {
/// LRU cache size
uint64_t lruSize;
} SMetaCfg;
typedef struct SMTbCursor SMTbCursor;
typedef struct SMCtbCursor SMCtbCursor;
typedef struct SMSmaCursor SMSmaCursor;
typedef SVCreateTbReq STbCfg;
typedef SVCreateTSmaReq SSmaCfg;
typedef struct SDataStatis {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
} SDataStatis;
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
int metaGetTbNum(SMeta *pMeta);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
char *metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
typedef struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
} STsdbQueryCond;
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor *pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur);
typedef struct {
TSKEY lastKey;
uint64_t uid;
} STableKeyInfo;
// tsdb
typedef struct STsdb STsdb;
typedef struct SDataStatis SDataStatis;
typedef struct STsdbQueryCond STsdbQueryCond;
typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta,
STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
int32_t tsdbInitSma(STsdb *pTsdb);
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId,
uint64_t taskId);
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId,
void *pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT *pReader);
int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, const char *pTagCond, size_t len,
int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupInfo,
SColIndex *pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle);
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis);
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
typedef struct STable {
uint64_t tid;
uint64_t uid;
STSchema *pSchema;
} STable;
// tq
enum {
TQ_STREAM_TOKEN__DATA = 1,
TQ_STREAM_TOKEN__WATERMARK,
TQ_STREAM_TOKEN__CHECKPOINT,
};
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
typedef struct STqReadHandle STqReadHandle;
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
// TYPES EXPOSED
typedef struct STsdb STsdb;
void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList);
int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList);
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
typedef struct STsdbCfg {
// need to reposition
typedef struct SMgmtWrapper SMgmtWrapper;
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
// structs
struct SMetaCfg {
uint64_t lruSize;
};
struct STsdbCfg {
int8_t precision;
int8_t update;
int8_t compression;
@ -112,15 +174,13 @@ typedef struct STsdbCfg {
int32_t keep2;
uint64_t lruCacheSize;
SArray *retentions;
} STsdbCfg;
};
typedef struct {
// TODO
struct STqCfg {
int32_t reserved;
} STqCfg;
};
typedef struct {
struct SVnodeCfg {
int32_t vgId;
uint64_t dbId;
STfs *pTfs;
@ -140,9 +200,9 @@ typedef struct {
uint32_t hashBegin;
uint32_t hashEnd;
int8_t hashMethod;
} SVnodeCfg;
};
typedef struct {
struct STqReadHandle {
int64_t ver;
int64_t tbUid;
SHashObj *tbIdHash;
@ -155,133 +215,38 @@ typedef struct {
int32_t sver;
SSchemaWrapper *pSchemaWrapper;
STSchema *pSchema;
} STqReadHandle;
/* ------------------------ SVnode ------------------------ */
/**
* @brief Initialize the vnode module
*
* @return int 0 for success and -1 for failure
*/
int vnodeInit();
/**
* @brief Cleanup the vnode module
*
*/
void vnodeCleanup();
/**
* @brief Open a VNODE.
*
* @param path path of the vnode
* @param pVnodeCfg options of the vnode
* @return SVnode* The vnode object
*/
SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg);
/**
* @brief Close a VNODE
*
* @param pVnode The vnode object to close
*/
void vnodeClose(SVnode *pVnode);
/**
* @brief Destroy a VNODE.
*
* @param path Path of the VNODE.
*/
void vnodeDestroy(const char *path);
/**
* @brief Process an array of write messages.
*
* @param pVnode The vnode object.
* @param pMsgs The array of SRpcMsg
*/
void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs);
/**
* @brief Apply a write request message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a consume message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @param pRsp The response message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process the sync request
*
* @param pVnode
* @param pMsg
* @param pRsp
* @return int
*/
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp);
/**
* @brief Process a query message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg);
/**
* @brief Process a fetch message.
*
* @param pVnode The vnode object.
* @param pMsg The request message
* @return int 0 for success, -1 for failure
*/
int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo);
/* ------------------------ SVnodeCfg ------------------------ */
/**
* @brief Initialize VNODE options.
*
* @param pOptions The options object to be initialized. It should not be NULL.
*/
void vnodeOptionsInit(SVnodeCfg *pOptions);
/**
* @brief Clear VNODE options.
*
* @param pOptions Options to clear.
*/
void vnodeOptionsClear(SVnodeCfg *pOptions);
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName);
/* ------------------------ FOR COMPILE ------------------------ */
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg);
int32_t vnodeCompact(SVnode *pVnode);
int32_t vnodeSync(SVnode *pVnode);
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
/* ------------------------- TQ READ --------------------------- */
enum {
TQ_STREAM_TOKEN__DATA = 1,
TQ_STREAM_TOKEN__WATERMARK,
TQ_STREAM_TOKEN__CHECKPOINT,
};
struct SDataStatis {
int16_t colId;
int16_t maxIndex;
int16_t minIndex;
int16_t numOfNull;
int64_t sum;
int64_t max;
int64_t min;
};
struct STsdbQueryCond {
STimeWindow twindow;
int32_t order; // desc|asc order to iterate the data block
int32_t numOfCols;
SColumnInfo *colList;
bool loadExternalRows; // load external rows or not
int32_t type; // data block load type:
};
typedef struct {
TSKEY lastKey;
uint64_t uid;
} STableKeyInfo;
typedef struct STable {
uint64_t tid;
uint64_t uid;
STSchema *pSchema;
} STable;
typedef struct {
int8_t type;
int8_t reserved[7];
@ -292,294 +257,6 @@ typedef struct {
};
} STqStreamToken;
STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta);
static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) {
pReadHandle->pColIdList = pColIdList;
}
// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) {
// pHandle->tbUid = tbUid;
//}
static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
if (pHandle->tbIdHash) {
taosHashClear(pHandle->tbIdHash);
}
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
}
return 0;
}
static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) {
if (pHandle->tbIdHash == NULL) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
}
return 0;
}
int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver);
bool tqNextDataBlock(STqReadHandle *pHandle);
int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo);
// return SArray<SColumnInfoData>
SArray *tqRetrieveDataBlock(STqReadHandle *pHandle);
// meta.h
SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF);
void metaClose(SMeta *pMeta);
void metaRemove(const char *path);
int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg);
int metaDropTable(SMeta *pMeta, tb_uid_t uid);
int metaCommit(SMeta *pMeta);
int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg);
int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid);
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid);
STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid);
SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline);
STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver);
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode);
STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid);
SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup);
int metaGetTbNum(SMeta *pMeta);
SMTbCursor *metaOpenTbCursor(SMeta *pMeta);
void metaCloseTbCursor(SMTbCursor *pTbCur);
char *metaTbCursorNext(SMTbCursor *pTbCur);
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseCtbCurosr(SMCtbCursor *pCtbCur);
tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur);
SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid);
void metaCloseSmaCursor(SMSmaCursor *pSmaCur);
int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur);
// Options
void metaOptionsInit(SMetaCfg *pMetaCfg);
void metaOptionsClear(SMetaCfg *pMetaCfg);
// query condition to build multi-table data block iterator
// STsdb
STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs);
void tsdbClose(STsdb *);
void tsdbRemove(const char *path);
int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp);
int tsdbPrepareCommit(STsdb *pTsdb);
int tsdbCommit(STsdb *pTsdb);
int32_t tsdbInitSma(STsdb *pTsdb);
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg);
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
/**
* @brief When submit msg received, update the relative expired window synchronously.
*
* @param pTsdb
* @param pMsg
* @param version
* @return int32_t
*/
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
/**
* @brief Insert tSma(Time-range-wise SMA) data from stream computing engine
*
* @param pTsdb
* @param indexUid
* @param msg
* @return int32_t
*/
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
/**
* @brief Drop tSma data and local cache.
*
* @param pTsdb
* @param indexUid
* @return int32_t
*/
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
/**
* @brief Insert RSma(Rollup SMA) data.
*
* @param pTsdb
* @param msg
* @return int32_t
*/
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
// TODO: This is the basic params, and should wrap the params to a queryHandle.
/**
* @brief Get tSma(Time-range-wise SMA) data.
*
* @param pTsdb
* @param pData
* @param indexUid
* @param querySKey
* @param nMaxResult
* @return int32_t
*/
int32_t tsdbGetTSmaData(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult);
// STsdbCfg
int tsdbOptionsInit(STsdbCfg *);
void tsdbOptionsClear(STsdbCfg *);
typedef void* tsdbReaderT;
/**
* Get the data block iterator, starting from position according to the query condition
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfoGroup table object list in the form of set, grouped into different sets according to the
* group by condition
* @param qinfo query info handle from query processor
* @return
*/
tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId);
/**
* Get the last row of the given query time window for all the tables in STableGroupInfo object.
* Note that only one data block with only row will be returned while invoking retrieve data block function for
* all tables in this group.
*
* @param tsdb tsdb handle
* @param pCond query condition, including time window, result set order, and basic required columns for each block
* @param tableInfo table list.
* @return
*/
//tsdbReaderT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId,
// SMemRef *pRef);
tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef);
int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* pReader, STableBlockDistInfo* pTableBlockInfo);
bool isTsdbCacheLastRow(tsdbReaderT* pReader);
/**
*
* @param tsdb
* @param uid
* @param skey
* @param pTagCond
* @param len
* @param tagNameRelType
* @param tbnameCond
* @param pGroupInfo
* @param pColIndex
* @param numOfCols
* @param reqId
* @return
*/
int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len,
int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo,
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId);
/**
* get num of rows in mem table
*
* @param pHandle
* @return row size
*/
int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle);
/**
* move to next block if exists
*
* @param pTsdbReadHandle
* @return
*/
bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
/**
* Get current data block information
*
* @param pTsdbReadHandle
* @param pBlockInfo
* @return
*/
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
/**
*
* Get the pre-calculated information w.r.t. current data block.
*
* In case of data block in cache, the pBlockStatis will always be NULL.
* If a block is not completed loaded from disk, the pBlockStatis will be NULL.
* @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0
* @return
*/
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis);
/**
*
* The query condition with primary timestamp is passed to iterator during its constructor function,
* the returned data block must be satisfied with the time window condition in any cases,
* which means the SData data block is not actually the completed disk data blocks.
*
* @param pTsdbReadHandle query handle
* @param pColumnIdList required data columns id list
* @return
*/
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
/**
* destroy the created table group list, which is generated by tag query
* @param pGroupList
*/
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
/**
* create the table group result including only one table, used to handle the normal table query
*
* @param tsdb tsdbHandle
* @param uid table uid
* @param pGroupInfo the generated result
* @return
*/
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
/**
*
* @param tsdb
* @param pTableIdList
* @param pGroupInfo
* @return
*/
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
/**
* clean up the query handle
* @param queryHandle
*/
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg);
#ifdef __cplusplus
}
#endif

View File

@ -24,6 +24,17 @@ typedef struct SMetaCache SMetaCache;
typedef struct SMetaIdx SMetaIdx;
typedef struct SMetaDB SMetaDB;
SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF);
void metaClose(SMeta* pMeta);
void metaRemove(const char* path);
int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg);
int metaDropTable(SMeta* pMeta, tb_uid_t uid);
int metaCommit(SMeta* pMeta);
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid);
STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid);
// SMetaDB
int metaOpenDB(SMeta* pMeta);
void metaCloseDB(SMeta* pMeta);

View File

@ -17,6 +17,9 @@
#define _TD_VNODE_DEF_H_
#include "executor.h"
#include "filter.h"
#include "qworker.h"
#include "sync.h"
#include "tchecksum.h"
#include "tcoding.h"
#include "tcompression.h"
@ -25,14 +28,15 @@
#include "tglobal.h"
#include "tlist.h"
#include "tlockfree.h"
#include "tlosertree.h"
#include "tmacro.h"
#include "tmallocator.h"
#include "tskiplist.h"
#include "tstream.h"
#include "ttime.h"
#include "ttimer.h"
#include "vnode.h"
#include "wal.h"
#include "qworker.h"
#ifdef __cplusplus
extern "C" {

View File

@ -16,10 +16,7 @@
#define ALLOW_FORBID_FUNC
#include "db.h"
#include "metaDef.h"
#include "tcoding.h"
#include "thash.h"
#include "vnodeInt.h"
#define IMPL_WITH_LOCK 1
// #if IMPL_WITH_LOCK
@ -262,7 +259,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
return 0;
}
int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) {
int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) {
// TODO
#if 0
DBT key = {0};
@ -668,7 +665,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) {
}
void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) {
STSma * pCfg = NULL;
STSma *pCfg = NULL;
SMetaDB *pDB = pMeta->pDB;
DBT key = {0};
DBT value = {0};
@ -711,9 +708,9 @@ static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_
int ret;
void *pBuf;
// SSchema *pSchema;
SSchemaKey schemaKey = {uid, sver, 0};
DBT key = {0};
DBT value = {0};
SSchemaKey schemaKey = {uid, sver, 0};
DBT key = {0};
DBT value = {0};
// Set key/value properties
key.data = &schemaKey;
@ -761,14 +758,14 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
}
int metaGetTbNum(SMeta *pMeta) {
SMetaDB *pDB = pMeta->pDB;
SMetaDB *pDB = pMeta->pDB;
DB_BTREE_STAT *sp1;
pDB->pTbDB->stat(pDB->pNtbIdx, NULL, &sp1, 0);
DB_BTREE_STAT *sp2;
pDB->pTbDB->stat(pDB->pCtbIdx, NULL, &sp2, 0);
return sp1->bt_nkeys + sp2->bt_nkeys;
}

View File

@ -18,11 +18,6 @@
const SMetaCfg defaultMetaOptions = {.lruSize = 0};
/* ------------------------ EXPOSED METHODS ------------------------ */
void metaOptionsInit(SMetaCfg *pMetaOptions) { metaOptionsCopy(pMetaOptions, &defaultMetaOptions); }
void metaOptionsClear(SMetaCfg *pMetaOptions) {
// TODO
}
int metaValidateOptions(const SMetaCfg *pMetaOptions) {
// TODO

View File

@ -11,4 +11,6 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "vnodeInt.h"

View File

@ -15,7 +15,6 @@
#include "vnodeInt.h"
#include "tdbInt.h"
typedef struct SPoolMem {
int64_t size;
struct SPoolMem *prev;
@ -27,18 +26,18 @@ typedef struct SPoolMem {
static SPoolMem *openPool();
static void clearPool(SPoolMem *pPool);
static void closePool(SPoolMem *pPool);
static void * poolMalloc(void *arg, size_t size);
static void *poolMalloc(void *arg, size_t size);
static void poolFree(void *arg, void *ptr);
struct SMetaDB {
TXN txn;
TENV * pEnv;
TDB * pTbDB;
TDB * pSchemaDB;
TDB * pNameIdx;
TDB * pStbIdx;
TDB * pNtbIdx;
TDB * pCtbIdx;
TENV *pEnv;
TDB *pTbDB;
TDB *pSchemaDB;
TDB *pNameIdx;
TDB *pStbIdx;
TDB *pNtbIdx;
TDB *pCtbIdx;
SPoolMem *pPool;
#ifdef META_TDB_SMA_TEST
TDB *pSmaDB;
@ -52,7 +51,7 @@ typedef struct __attribute__((__packed__)) {
} SSchemaDbKey;
typedef struct {
char * name;
char *name;
tb_uid_t uid;
} SNameIdxKey;
@ -251,14 +250,14 @@ void metaCloseDB(SMeta *pMeta) {
int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) {
tb_uid_t uid;
SMetaDB * pMetaDb;
void * pKey;
void * pVal;
SMetaDB *pMetaDb;
void *pKey;
void *pVal;
int kLen;
int vLen;
int ret;
char buf[512];
void * pBuf;
void *pBuf;
SCtbIdxKey ctbIdxKey;
SSchemaDbKey schemaDbKey;
SSchemaWrapper schemaWrapper;
@ -375,11 +374,11 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) {
STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) {
int ret;
SMetaDB *pMetaDb = pMeta->pDB;
void * pKey;
void * pVal;
void *pKey;
void *pVal;
int kLen;
int vLen;
STbCfg * pTbCfg;
STbCfg *pTbCfg;
// Fetch
pKey = &uid;
@ -431,14 +430,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo
}
static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) {
void * pKey;
void * pVal;
void *pKey;
void *pVal;
int kLen;
int vLen;
int ret;
SSchemaDbKey schemaDbKey;
SSchemaWrapper *pSchemaWrapper;
void * pBuf;
void *pBuf;
// fetch
schemaDbKey.uid = uid;
@ -465,9 +464,9 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
tb_uid_t quid;
SSchemaWrapper *pSW;
STSchemaBuilder sb;
SSchemaEx * pSchema;
STSchema * pTSchema;
STbCfg * pTbCfg;
SSchemaEx *pSchema;
STSchema *pTSchema;
STbCfg *pTbCfg;
pTbCfg = metaGetTbInfoByUid(pMeta, uid);
if (pTbCfg->type == META_CHILD_TABLE) {
@ -498,7 +497,7 @@ struct SMTbCursor {
SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
SMTbCursor *pTbCur = NULL;
SMetaDB * pDB = pMeta->pDB;
SMetaDB *pDB = pMeta->pDB;
pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur));
if (pTbCur == NULL) {
@ -520,12 +519,12 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) {
}
char *metaTbCursorNext(SMTbCursor *pTbCur) {
void * pKey = NULL;
void * pVal = NULL;
void *pKey = NULL;
void *pVal = NULL;
int kLen;
int vLen;
int ret;
void * pBuf;
void *pBuf;
STbCfg tbCfg;
for (;;) {
@ -548,17 +547,17 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) {
}
struct SMCtbCursor {
TDBC * pCur;
TDBC *pCur;
tb_uid_t suid;
void * pKey;
void * pVal;
void *pKey;
void *pVal;
int kLen;
int vLen;
};
SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) {
SMCtbCursor *pCtbCur = NULL;
SMetaDB * pDB = pMeta->pDB;
SMetaDB *pDB = pMeta->pDB;
int ret;
pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur));
@ -654,7 +653,6 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) {
continue;
}
++pSW->number;
STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma));
if (tptr == NULL) {
@ -709,10 +707,10 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) {
// ASSERT(0);
#ifdef META_TDB_SMA_TEST
int32_t ret = 0;
int32_t ret = 0;
SMetaDB *pMetaDb = pMeta->pDB;
void *pBuf = NULL, *qBuf = NULL;
void *key = {0}, *val = {0};
void *pBuf = NULL, *qBuf = NULL;
void *key = {0}, *val = {0};
// save sma info
int32_t len = tEncodeTSma(NULL, pSmaCfg);
@ -1103,7 +1101,7 @@ static void closePool(SPoolMem *pPool) {
}
static void *poolMalloc(void *arg, size_t size) {
void * ptr = NULL;
void *ptr = NULL;
SPoolMem *pPool = (SPoolMem *)arg;
SPoolMem *pMem;

View File

@ -14,7 +14,6 @@
*/
#include "vnodeInt.h"
#include "tcoding.h"
int metaValidateTbCfg(SMeta *pMeta, const STbCfg *pTbOptions) {
// TODO

View File

@ -11,4 +11,6 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
*/
#include "vnodeInt.h"

View File

@ -13,9 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tcompare.h"
#include "tdatablock.h"
#include "tstream.h"
#include "vnodeInt.h"
int32_t tqInit() { return tqPushMgrInit(); }

View File

@ -12,3 +12,5 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"

View File

@ -14,13 +14,13 @@
*/
#include "vnodeInt.h"
// TODO:replace by an abstract file layer
#include <fcntl.h>
#include <string.h>
#include <unistd.h>
#include "osDir.h"
// #include <fcntl.h>
// #include <string.h>
// #include <unistd.h>
// #include "osDir.h"
#define TQ_META_NAME "tq.meta"
#define TQ_IDX_NAME "tq.idx"
#define TQ_IDX_NAME "tq.idx"
static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value);
static void* tqHandleGetUncommitted(STqMetaStore*, int64_t key);

View File

@ -13,8 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdatablock.h"
#include "vnode.h"
#include "vnodeInt.h"
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
@ -88,7 +87,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo)
pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList);
pBlockInfo->rows = pHandle->pBlock->numOfRows;
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData.
return 0;
}
@ -177,3 +176,41 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
}
return pArray;
}
void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; }
int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
if (pHandle->tbIdHash) {
taosHashClear(pHandle->tbIdHash);
}
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
}
return 0;
}
int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) {
if (pHandle->tbIdHash == NULL) {
pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK);
if (pHandle->tbIdHash == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
}
for (int i = 0; i < taosArrayGetSize(tbUidList); i++) {
int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i);
taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0);
}
return 0;
}

View File

@ -16,9 +16,7 @@
#define ALLOW_FORBID_FUNC
#include "db.h"
#include "taoserror.h"
#include "tcoding.h"
#include "thash.h"
#include "vnodeInt.h"
#define IMPL_WITH_LOCK 1
@ -139,7 +137,7 @@ int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data,
return 0;
}
void *tsdbGetSmaDataByKey(SDBFile *pDBF, void* key, uint32_t keySize, uint32_t *valueSize) {
void *tsdbGetSmaDataByKey(SDBFile *pDBF, void *key, uint32_t keySize, uint32_t *valueSize) {
void *result = NULL;
DBT key1 = {0};
DBT value1 = {0};

View File

@ -13,9 +13,7 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <regex.h>
#include "vnodeInt.h"
#include "os.h"
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
static const char *tsdbTxnFname[] = {"current.t", "current"};
@ -97,8 +95,8 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) {
return tlen;
}
static void *tsdbDecodeDFileSetArray(STsdb*pRepo, void *buf, SArray *pArray) {
uint64_t nset = 0;
static void *tsdbDecodeDFileSetArray(STsdb *pRepo, void *buf, SArray *pArray) {
uint64_t nset = 0;
taosArrayClear(pArray);
@ -122,7 +120,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) {
return tlen;
}
static void *tsdbDecodeFSStatus(STsdb*pRepo, void *buf, SFSStatus *pStatus) {
static void *tsdbDecodeFSStatus(STsdb *pRepo, void *buf, SFSStatus *pStatus) {
tsdbResetFSStatus(pStatus);
// pStatus->pmf = &(pStatus->mf);
@ -407,8 +405,8 @@ int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFil
static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) {
SFSHeader fsheader;
void * pBuf = NULL;
void * ptr;
void *pBuf = NULL;
void *ptr;
char hbuf[TSDB_FILE_HEAD_SIZE] = "\0";
char tfname[TSDB_FILENAME_LEN] = "\0";
char cfname[TSDB_FILENAME_LEN] = "\0";
@ -592,7 +590,7 @@ void tsdbFSIterSeek(SFSIter *pIter, int fid) {
}
SDFileSet *tsdbFSIterNext(SFSIter *pIter) {
STsdbFS * pfs = pIter->pfs;
STsdbFS *pfs = pIter->pfs;
SDFileSet *pSet;
if (pIter->index < 0) {
@ -651,12 +649,12 @@ static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) {
}
static int tsdbOpenFSFromCurrent(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
STsdbFS *pfs = REPO_FS(pRepo);
TdFilePtr pFile = NULL;
void * buffer = NULL;
void *buffer = NULL;
SFSHeader fsheader;
char current[TSDB_FILENAME_LEN] = "\0";
void * ptr;
void *ptr;
tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current);
@ -746,7 +744,7 @@ _err:
// Scan and try to fix incorrect files
static int tsdbScanAndTryFixFS(STsdb *pRepo) {
STsdbFS * pfs = REPO_FS(pRepo);
STsdbFS *pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
// if (tsdbScanAndTryFixMFile(pRepo) < 0) {
@ -908,9 +906,9 @@ static int tsdbScanAndTryFixFS(STsdb *pRepo) {
// }
static int tsdbScanRootDir(STsdb *pRepo) {
char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
char rootDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetRootDir(REPO_ID(pRepo), rootDir);
@ -942,9 +940,9 @@ static int tsdbScanRootDir(STsdb *pRepo) {
}
static int tsdbScanDataDir(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS * pfs = REPO_FS(pRepo);
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STsdbFS *pfs = REPO_FS(pRepo);
const STfsFile *pf;
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
@ -1107,14 +1105,14 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) {
// }
static int tsdbRestoreDFileSet(STsdb *pRepo) {
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STfsDir * tdir = NULL;
char dataDir[TSDB_FILENAME_LEN];
char bname[TSDB_FILENAME_LEN];
STfsDir *tdir = NULL;
const STfsFile *pf = NULL;
const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|smad|smal)(-ver[0-9]+)?$";
SArray * fArray = NULL;
regex_t regex;
STsdbFS * pfs = REPO_FS(pRepo);
const char *pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|smad|smal)(-ver[0-9]+)?$";
SArray *fArray = NULL;
regex_t regex;
STsdbFS *pfs = REPO_FS(pRepo);
tsdbGetDataDir(REPO_ID(pRepo), dataDir);
@ -1327,7 +1325,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) {
}
static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) {
STsdbFS * pfs = REPO_FS(pRepo);
STsdbFS *pfs = REPO_FS(pRepo);
SFSStatus *pStatus = pfs->cstatus;
SDFInfo info;

View File

@ -26,15 +26,6 @@ const STsdbCfg defautlTsdbOptions = {.precision = 0,
.update = 0,
.compression = TWO_STAGE_COMP};
int tsdbOptionsInit(STsdbCfg *pTsdbOptions) {
// TODO
return 0;
}
void tsdbOptionsClear(STsdbCfg *pTsdbOptions) {
// TODO
}
int tsdbValidateOptions(const STsdbCfg *pTsdbOptions) {
// TODO
return 0;

View File

@ -13,18 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "talgo.h"
#include "tcompare.h"
#include "tdatablock.h"
#include "tdataformat.h"
#include "texception.h"
#include "vnodeInt.h"
#include "filter.h"
#include "taosdef.h"
#include "tlosertree.h"
#include "tmsg.h"
#include "vnodeInt.h"
#define EXTRA_BYTES 2

View File

@ -18,13 +18,6 @@
const SVnodeCfg defaultVnodeOptions = {
.wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */
void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */
vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions);
}
void vnodeOptionsClear(SVnodeCfg *pVnodeOptions) { /* TODO */
}
int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) {
// TODO
return 0;
@ -36,14 +29,14 @@ void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) {
int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
uint32_t hashValue = 0;
switch (pVnodeOptions->hashMethod) {
default:
hashValue = MurmurHash3_32(tableFName, strlen(tableFName));
break;
}
// TODO OPEN THIS !!!!!!!
// TODO OPEN THIS !!!!!!!
#if 0
if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) {
terrno = TSDB_CODE_VND_HASH_MISMATCH;
@ -53,5 +46,3 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) {
return TSDB_CODE_SUCCESS;
}

View File

@ -14,7 +14,6 @@
*/
#define _DEFAULT_SOURCE
#include "sync.h"
#include "vnodeInt.h"
// #include "vnodeInt.h"

View File

@ -14,7 +14,6 @@
*/
#include "vnodeInt.h"
#include "tglobal.h"
SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED};

View File

@ -13,7 +13,6 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "vnodeInt.h"
static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg);

View File

@ -81,7 +81,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
// TODO: to encapsule a free API
taosMemoryFree(vCreateTbReq.stbCfg.pSchema);
taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema);
if(vCreateTbReq.stbCfg.pRSmaParam) {
if (vCreateTbReq.stbCfg.pRSmaParam) {
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds);
taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam);
}
@ -235,13 +235,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// TODO
}
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
// } break;
// case TDMT_VND_CANCEL_SMA: { // timeRangeSMA
// } break;
// case TDMT_VND_DROP_SMA: { // timeRangeSMA
// if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) {
// // TODO
// }
#if 0
tsdbTSmaSub(pVnode->pTsdb, 1);
SVDropTSmaReq vDropSmaReq = {0};