diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a93e1741bb..40be3f1299 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -25,82 +25,83 @@ #include "tfs.h" #include "wal.h" +#include "tcommon.h" +#include "tfs.h" #include "tmallocator.h" #include "tmsg.h" #include "trow.h" -#include "tmallocator.h" -#include "tcommon.h" -#include "tfs.h" #ifdef __cplusplus extern "C" { #endif -/* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SMgmtWrapper SMgmtWrapper; -typedef struct SVnode SVnode; +// types +typedef struct SVnode SVnode; +typedef struct SMetaCfg SMetaCfg; // todo: remove +typedef struct STsdbCfg STsdbCfg; // todo: remove +typedef struct STqCfg STqCfg; // todo: remove +typedef struct SVnodeCfg SVnodeCfg; +typedef struct STqReadHandle STqReadHandle; +typedef struct SMTbCursor SMTbCursor; // todo: remove +typedef struct SMCtbCursor SMCtbCursor; // todo: remove +typedef struct SMSmaCursor SMSmaCursor; // todo: remove +typedef void *tsdbReaderT; +typedef struct STsdbQueryCond STsdbQueryCond; +typedef struct SDataStatis SDataStatis; -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE +// vnode +int vnodeInit(); +void vnodeCleanup(); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); +void vnodeClose(SVnode *pVnode); +void vnodeDestroy(const char *path); +void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); +int32_t vnodeCompact(SVnode *pVnode); +int32_t vnodeSync(SVnode *pVnode); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -// Types exported -typedef struct SMeta SMeta; +// meta +typedef struct SMeta SMeta; // todo: remove -typedef struct SMetaCfg { - /// LRU cache size - uint64_t lruSize; -} SMetaCfg; - -typedef struct SMTbCursor SMTbCursor; -typedef struct SMCtbCursor SMCtbCursor; -typedef struct SMSmaCursor SMSmaCursor; - -typedef SVCreateTbReq STbCfg; -typedef SVCreateTSmaReq SSmaCfg; - -typedef struct SDataStatis { - int16_t colId; - int16_t maxIndex; - int16_t minIndex; - int16_t numOfNull; - int64_t sum; - int64_t max; - int64_t min; -} SDataStatis; - -typedef struct STsdbQueryCond { - STimeWindow twindow; - int32_t order; // desc|asc order to iterate the data block - int32_t numOfCols; - SColumnInfo *colList; - bool loadExternalRows; // load external rows or not - int32_t type; // data block load type: -} STsdbQueryCond; - -typedef struct { - TSKEY lastKey; - uint64_t uid; -} STableKeyInfo; - - -typedef struct STable { - uint64_t tid; - uint64_t uid; - STSchema *pSchema; -} STable; - -#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 -#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 -#define BLOCK_LOAD_TABLE_RR_ORDER 3 - -#define TABLE_TID(t) (t)->tid -#define TABLE_UID(t) (t)->uid - -// TYPES EXPOSED +// tsdb typedef struct STsdb STsdb; -typedef struct STsdbCfg { +tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, + uint64_t taskId); +tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, + void *pMemRef); +int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); +bool isTsdbCacheLastRow(tsdbReaderT *pReader); +int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, const char *pTagCond, size_t len, + int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupInfo, + SColIndex *pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId); +int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); +bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); +void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); +int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis); +SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); +void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); +int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); +int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); +void tsdbCleanupReadHandle(tsdbReaderT queryHandle); + +// tq + +// need to remove +int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); + +// structs +struct SMetaCfg { + uint64_t lruSize; +}; + +struct STsdbCfg { int8_t precision; int8_t update; int8_t compression; @@ -112,15 +113,13 @@ typedef struct STsdbCfg { int32_t keep2; uint64_t lruCacheSize; SArray *retentions; -} STsdbCfg; +}; - -typedef struct { - // TODO +struct STqCfg { int32_t reserved; -} STqCfg; +}; -typedef struct { +struct SVnodeCfg { int32_t vgId; uint64_t dbId; STfs *pTfs; @@ -140,9 +139,9 @@ typedef struct { uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; -} SVnodeCfg; +}; -typedef struct { +struct STqReadHandle { int64_t ver; int64_t tbUid; SHashObj *tbIdHash; @@ -155,127 +154,62 @@ typedef struct { int32_t sver; SSchemaWrapper *pSchemaWrapper; STSchema *pSchema; -} STqReadHandle; +}; + +// ---------------------------- OLD ---------------------------- +typedef struct SMgmtWrapper SMgmtWrapper; + +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_NORMAL_TABLE TD_NORMAL_TABLE + +// Types exported + +typedef SVCreateTbReq STbCfg; +typedef SVCreateTSmaReq SSmaCfg; + +typedef struct SDataStatis { + int16_t colId; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; + int64_t sum; + int64_t max; + int64_t min; +} SDataStatis; + +struct STsdbQueryCond { + STimeWindow twindow; + int32_t order; // desc|asc order to iterate the data block + int32_t numOfCols; + SColumnInfo *colList; + bool loadExternalRows; // load external rows or not + int32_t type; // data block load type: +}; + +typedef struct { + TSKEY lastKey; + uint64_t uid; +} STableKeyInfo; + +typedef struct STable { + uint64_t tid; + uint64_t uid; + STSchema *pSchema; +} STable; + +#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 +#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 +#define BLOCK_LOAD_TABLE_RR_ORDER 3 + +#define TABLE_TID(t) (t)->tid +#define TABLE_UID(t) (t)->uid /* ------------------------ SVnode ------------------------ */ -/** - * @brief Initialize the vnode module - * - * @return int 0 for success and -1 for failure - */ -int vnodeInit(); - -/** - * @brief Cleanup the vnode module - * - */ -void vnodeCleanup(); - -/** - * @brief Open a VNODE. - * - * @param path path of the vnode - * @param pVnodeCfg options of the vnode - * @return SVnode* The vnode object - */ -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); - -/** - * @brief Close a VNODE - * - * @param pVnode The vnode object to close - */ -void vnodeClose(SVnode *pVnode); - -/** - * @brief Destroy a VNODE. - * - * @param path Path of the VNODE. - */ -void vnodeDestroy(const char *path); - -/** - * @brief Process an array of write messages. - * - * @param pVnode The vnode object. - * @param pMsgs The array of SRpcMsg - */ -void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); - -/** - * @brief Apply a write request message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @param pRsp The response message - * @return int 0 for success, -1 for failure - */ -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process a consume message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @param pRsp The response message - * @return int 0 for success, -1 for failure - */ -int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process the sync request - * - * @param pVnode - * @param pMsg - * @param pRsp - * @return int - */ -int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process a query message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @return int 0 for success, -1 for failure - */ -int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); - -/** - * @brief Process a fetch message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @return int 0 for success, -1 for failure - */ -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); - -/* ------------------------ SVnodeCfg ------------------------ */ -/** - * @brief Initialize VNODE options. - * - * @param pOptions The options object to be initialized. It should not be NULL. - */ -void vnodeOptionsInit(SVnodeCfg *pOptions); - -/** - * @brief Clear VNODE options. - * - * @param pOptions Options to clear. - */ -void vnodeOptionsClear(SVnodeCfg *pOptions); - int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); /* ------------------------ FOR COMPILE ------------------------ */ -int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); -int32_t vnodeCompact(SVnode *pVnode); -int32_t vnodeSync(SVnode *pVnode); -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); - -/* ------------------------- TQ READ --------------------------- */ - enum { TQ_STREAM_TOKEN__DATA = 1, TQ_STREAM_TOKEN__WATERMARK, @@ -378,14 +312,14 @@ void metaOptionsClear(SMetaCfg *pMetaCfg); // query condition to build multi-table data block iterator // STsdb -STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs); +STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, + STfs *pTfs); void tsdbClose(STsdb *); void tsdbRemove(const char *path); int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); - int32_t tsdbInitSma(STsdb *pTsdb); int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); @@ -411,10 +345,10 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); /** * @brief Drop tSma data and local cache. - * - * @param pTsdb - * @param indexUid - * @return int32_t + * + * @param pTsdb + * @param indexUid + * @return int32_t */ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); @@ -427,159 +361,6 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); */ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); -// TODO: This is the basic params, and should wrap the params to a queryHandle. -/** - * @brief Get tSma(Time-range-wise SMA) data. - * - * @param pTsdb - * @param pData - * @param indexUid - * @param querySKey - * @param nMaxResult - * @return int32_t - */ -int32_t tsdbGetTSmaData(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult); - -// STsdbCfg -int tsdbOptionsInit(STsdbCfg *); -void tsdbOptionsClear(STsdbCfg *); - -typedef void* tsdbReaderT; - -/** - * Get the data block iterator, starting from position according to the query condition - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfoGroup table object list in the form of set, grouped into different sets according to the - * group by condition - * @param qinfo query info handle from query processor - * @return - */ -tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); - -/** - * Get the last row of the given query time window for all the tables in STableGroupInfo object. - * Note that only one data block with only row will be returned while invoking retrieve data block function for - * all tables in this group. - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfo table list. - * @return - */ -//tsdbReaderT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, -// SMemRef *pRef); - - -tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); - -int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* pReader, STableBlockDistInfo* pTableBlockInfo); - -bool isTsdbCacheLastRow(tsdbReaderT* pReader); - -/** - * - * @param tsdb - * @param uid - * @param skey - * @param pTagCond - * @param len - * @param tagNameRelType - * @param tbnameCond - * @param pGroupInfo - * @param pColIndex - * @param numOfCols - * @param reqId - * @return - */ -int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, - int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, - SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId); -/** - * get num of rows in mem table - * - * @param pHandle - * @return row size - */ - -int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle); - -/** - * move to next block if exists - * - * @param pTsdbReadHandle - * @return - */ -bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); - -/** - * Get current data block information - * - * @param pTsdbReadHandle - * @param pBlockInfo - * @return - */ -void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); - -/** - * - * Get the pre-calculated information w.r.t. current data block. - * - * In case of data block in cache, the pBlockStatis will always be NULL. - * If a block is not completed loaded from disk, the pBlockStatis will be NULL. - - * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 - * @return - */ -int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis); - -/** - * - * The query condition with primary timestamp is passed to iterator during its constructor function, - * the returned data block must be satisfied with the time window condition in any cases, - * which means the SData data block is not actually the completed disk data blocks. - * - * @param pTsdbReadHandle query handle - * @param pColumnIdList required data columns id list - * @return - */ -SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); - -/** - * destroy the created table group list, which is generated by tag query - * @param pGroupList - */ -void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); - -/** - * create the table group result including only one table, used to handle the normal table query - * - * @param tsdb tsdbHandle - * @param uid table uid - * @param pGroupInfo the generated result - * @return - */ -int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); - -/** - * - * @param tsdb - * @param pTableIdList - * @param pGroupInfo - * @return - */ -int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); - -/** - * clean up the query handle - * @param queryHandle - */ -void tsdbCleanupReadHandle(tsdbReaderT queryHandle); - -int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); - - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbOptions.c b/source/dnode/vnode/src/tsdb/tsdbOptions.c index da7a1d393f..2c57a7406e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOptions.c +++ b/source/dnode/vnode/src/tsdb/tsdbOptions.c @@ -26,15 +26,6 @@ const STsdbCfg defautlTsdbOptions = {.precision = 0, .update = 0, .compression = TWO_STAGE_COMP}; -int tsdbOptionsInit(STsdbCfg *pTsdbOptions) { - // TODO - return 0; -} - -void tsdbOptionsClear(STsdbCfg *pTsdbOptions) { - // TODO -} - int tsdbValidateOptions(const STsdbCfg *pTsdbOptions) { // TODO return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 4a70738e86..ef417cabc6 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -18,13 +18,6 @@ const SVnodeCfg defaultVnodeOptions = { .wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */ -void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */ - vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions); -} - -void vnodeOptionsClear(SVnodeCfg *pVnodeOptions) { /* TODO */ -} - int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) { // TODO return 0; @@ -36,14 +29,14 @@ void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) { int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { uint32_t hashValue = 0; - + switch (pVnodeOptions->hashMethod) { default: hashValue = MurmurHash3_32(tableFName, strlen(tableFName)); break; } - // TODO OPEN THIS !!!!!!! + // TODO OPEN THIS !!!!!!! #if 0 if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) { terrno = TSDB_CODE_VND_HASH_MISMATCH; @@ -53,5 +46,3 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { return TSDB_CODE_SUCCESS; } - -