From b932b5bca64abbb9e427553ed0f0b560c2eceb57 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 02:01:04 +0000 Subject: [PATCH 1/4] refact header file --- source/dnode/vnode/inc/vnode.h | 471 ++++++---------------- source/dnode/vnode/src/tsdb/tsdbOptions.c | 9 - source/dnode/vnode/src/vnd/vnodeCfg.c | 13 +- 3 files changed, 128 insertions(+), 365 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index a93e1741bb..40be3f1299 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -25,82 +25,83 @@ #include "tfs.h" #include "wal.h" +#include "tcommon.h" +#include "tfs.h" #include "tmallocator.h" #include "tmsg.h" #include "trow.h" -#include "tmallocator.h" -#include "tcommon.h" -#include "tfs.h" #ifdef __cplusplus extern "C" { #endif -/* ------------------------ TYPES EXPOSED ------------------------ */ -typedef struct SMgmtWrapper SMgmtWrapper; -typedef struct SVnode SVnode; +// types +typedef struct SVnode SVnode; +typedef struct SMetaCfg SMetaCfg; // todo: remove +typedef struct STsdbCfg STsdbCfg; // todo: remove +typedef struct STqCfg STqCfg; // todo: remove +typedef struct SVnodeCfg SVnodeCfg; +typedef struct STqReadHandle STqReadHandle; +typedef struct SMTbCursor SMTbCursor; // todo: remove +typedef struct SMCtbCursor SMCtbCursor; // todo: remove +typedef struct SMSmaCursor SMSmaCursor; // todo: remove +typedef void *tsdbReaderT; +typedef struct STsdbQueryCond STsdbQueryCond; +typedef struct SDataStatis SDataStatis; -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE +// vnode +int vnodeInit(); +void vnodeCleanup(); +SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); +void vnodeClose(SVnode *pVnode); +void vnodeDestroy(const char *path); +void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); +int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); +int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); +int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); +int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); +int32_t vnodeCompact(SVnode *pVnode); +int32_t vnodeSync(SVnode *pVnode); +int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); -// Types exported -typedef struct SMeta SMeta; +// meta +typedef struct SMeta SMeta; // todo: remove -typedef struct SMetaCfg { - /// LRU cache size - uint64_t lruSize; -} SMetaCfg; - -typedef struct SMTbCursor SMTbCursor; -typedef struct SMCtbCursor SMCtbCursor; -typedef struct SMSmaCursor SMSmaCursor; - -typedef SVCreateTbReq STbCfg; -typedef SVCreateTSmaReq SSmaCfg; - -typedef struct SDataStatis { - int16_t colId; - int16_t maxIndex; - int16_t minIndex; - int16_t numOfNull; - int64_t sum; - int64_t max; - int64_t min; -} SDataStatis; - -typedef struct STsdbQueryCond { - STimeWindow twindow; - int32_t order; // desc|asc order to iterate the data block - int32_t numOfCols; - SColumnInfo *colList; - bool loadExternalRows; // load external rows or not - int32_t type; // data block load type: -} STsdbQueryCond; - -typedef struct { - TSKEY lastKey; - uint64_t uid; -} STableKeyInfo; - - -typedef struct STable { - uint64_t tid; - uint64_t uid; - STSchema *pSchema; -} STable; - -#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 -#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 -#define BLOCK_LOAD_TABLE_RR_ORDER 3 - -#define TABLE_TID(t) (t)->tid -#define TABLE_UID(t) (t)->uid - -// TYPES EXPOSED +// tsdb typedef struct STsdb STsdb; -typedef struct STsdbCfg { +tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, + uint64_t taskId); +tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, + void *pMemRef); +int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT *pReader, STableBlockDistInfo *pTableBlockInfo); +bool isTsdbCacheLastRow(tsdbReaderT *pReader); +int32_t tsdbQuerySTableByTagCond(void *pMeta, uint64_t uid, TSKEY skey, const char *pTagCond, size_t len, + int16_t tagNameRelType, const char *tbnameCond, STableGroupInfo *pGroupInfo, + SColIndex *pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId); +int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT *pHandle); +bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); +void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); +int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis); +SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); +void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); +int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); +int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); +void tsdbCleanupReadHandle(tsdbReaderT queryHandle); + +// tq + +// need to remove +int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); + +// structs +struct SMetaCfg { + uint64_t lruSize; +}; + +struct STsdbCfg { int8_t precision; int8_t update; int8_t compression; @@ -112,15 +113,13 @@ typedef struct STsdbCfg { int32_t keep2; uint64_t lruCacheSize; SArray *retentions; -} STsdbCfg; +}; - -typedef struct { - // TODO +struct STqCfg { int32_t reserved; -} STqCfg; +}; -typedef struct { +struct SVnodeCfg { int32_t vgId; uint64_t dbId; STfs *pTfs; @@ -140,9 +139,9 @@ typedef struct { uint32_t hashBegin; uint32_t hashEnd; int8_t hashMethod; -} SVnodeCfg; +}; -typedef struct { +struct STqReadHandle { int64_t ver; int64_t tbUid; SHashObj *tbIdHash; @@ -155,127 +154,62 @@ typedef struct { int32_t sver; SSchemaWrapper *pSchemaWrapper; STSchema *pSchema; -} STqReadHandle; +}; + +// ---------------------------- OLD ---------------------------- +typedef struct SMgmtWrapper SMgmtWrapper; + +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_NORMAL_TABLE TD_NORMAL_TABLE + +// Types exported + +typedef SVCreateTbReq STbCfg; +typedef SVCreateTSmaReq SSmaCfg; + +typedef struct SDataStatis { + int16_t colId; + int16_t maxIndex; + int16_t minIndex; + int16_t numOfNull; + int64_t sum; + int64_t max; + int64_t min; +} SDataStatis; + +struct STsdbQueryCond { + STimeWindow twindow; + int32_t order; // desc|asc order to iterate the data block + int32_t numOfCols; + SColumnInfo *colList; + bool loadExternalRows; // load external rows or not + int32_t type; // data block load type: +}; + +typedef struct { + TSKEY lastKey; + uint64_t uid; +} STableKeyInfo; + +typedef struct STable { + uint64_t tid; + uint64_t uid; + STSchema *pSchema; +} STable; + +#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 +#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 +#define BLOCK_LOAD_TABLE_RR_ORDER 3 + +#define TABLE_TID(t) (t)->tid +#define TABLE_UID(t) (t)->uid /* ------------------------ SVnode ------------------------ */ -/** - * @brief Initialize the vnode module - * - * @return int 0 for success and -1 for failure - */ -int vnodeInit(); - -/** - * @brief Cleanup the vnode module - * - */ -void vnodeCleanup(); - -/** - * @brief Open a VNODE. - * - * @param path path of the vnode - * @param pVnodeCfg options of the vnode - * @return SVnode* The vnode object - */ -SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); - -/** - * @brief Close a VNODE - * - * @param pVnode The vnode object to close - */ -void vnodeClose(SVnode *pVnode); - -/** - * @brief Destroy a VNODE. - * - * @param path Path of the VNODE. - */ -void vnodeDestroy(const char *path); - -/** - * @brief Process an array of write messages. - * - * @param pVnode The vnode object. - * @param pMsgs The array of SRpcMsg - */ -void vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs); - -/** - * @brief Apply a write request message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @param pRsp The response message - * @return int 0 for success, -1 for failure - */ -int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process a consume message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @param pRsp The response message - * @return int 0 for success, -1 for failure - */ -int vnodeProcessCMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process the sync request - * - * @param pVnode - * @param pMsg - * @param pRsp - * @return int - */ -int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp); - -/** - * @brief Process a query message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @return int 0 for success, -1 for failure - */ -int vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg); - -/** - * @brief Process a fetch message. - * - * @param pVnode The vnode object. - * @param pMsg The request message - * @return int 0 for success, -1 for failure - */ -int vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo); - -/* ------------------------ SVnodeCfg ------------------------ */ -/** - * @brief Initialize VNODE options. - * - * @param pOptions The options object to be initialized. It should not be NULL. - */ -void vnodeOptionsInit(SVnodeCfg *pOptions); - -/** - * @brief Clear VNODE options. - * - * @param pOptions Options to clear. - */ -void vnodeOptionsClear(SVnodeCfg *pOptions); - int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); /* ------------------------ FOR COMPILE ------------------------ */ -int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); -int32_t vnodeCompact(SVnode *pVnode); -int32_t vnodeSync(SVnode *pVnode); -int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); - -/* ------------------------- TQ READ --------------------------- */ - enum { TQ_STREAM_TOKEN__DATA = 1, TQ_STREAM_TOKEN__WATERMARK, @@ -378,14 +312,14 @@ void metaOptionsClear(SMetaCfg *pMetaCfg); // query condition to build multi-table data block iterator // STsdb -STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, STfs *pTfs); +STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, + STfs *pTfs); void tsdbClose(STsdb *); void tsdbRemove(const char *path); int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); int tsdbCommit(STsdb *pTsdb); - int32_t tsdbInitSma(STsdb *pTsdb); int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); @@ -411,10 +345,10 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); /** * @brief Drop tSma data and local cache. - * - * @param pTsdb - * @param indexUid - * @return int32_t + * + * @param pTsdb + * @param indexUid + * @return int32_t */ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); @@ -427,159 +361,6 @@ int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); */ int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); -// TODO: This is the basic params, and should wrap the params to a queryHandle. -/** - * @brief Get tSma(Time-range-wise SMA) data. - * - * @param pTsdb - * @param pData - * @param indexUid - * @param querySKey - * @param nMaxResult - * @return int32_t - */ -int32_t tsdbGetTSmaData(STsdb *pTsdb, char *pData, int64_t indexUid, TSKEY querySKey, int32_t nMaxResult); - -// STsdbCfg -int tsdbOptionsInit(STsdbCfg *); -void tsdbOptionsClear(STsdbCfg *); - -typedef void* tsdbReaderT; - -/** - * Get the data block iterator, starting from position according to the query condition - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfoGroup table object list in the form of set, grouped into different sets according to the - * group by condition - * @param qinfo query info handle from query processor - * @return - */ -tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); - -/** - * Get the last row of the given query time window for all the tables in STableGroupInfo object. - * Note that only one data block with only row will be returned while invoking retrieve data block function for - * all tables in this group. - * - * @param tsdb tsdb handle - * @param pCond query condition, including time window, result set order, and basic required columns for each block - * @param tableInfo table list. - * @return - */ -//tsdbReaderT tsdbQueryLastRow(STsdbRepo *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfo, uint64_t qId, -// SMemRef *pRef); - - -tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, void* pMemRef); - -int32_t tsdbGetFileBlocksDistInfo(tsdbReaderT* pReader, STableBlockDistInfo* pTableBlockInfo); - -bool isTsdbCacheLastRow(tsdbReaderT* pReader); - -/** - * - * @param tsdb - * @param uid - * @param skey - * @param pTagCond - * @param len - * @param tagNameRelType - * @param tbnameCond - * @param pGroupInfo - * @param pColIndex - * @param numOfCols - * @param reqId - * @return - */ -int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const char* pTagCond, size_t len, - int16_t tagNameRelType, const char* tbnameCond, STableGroupInfo* pGroupInfo, - SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId); -/** - * get num of rows in mem table - * - * @param pHandle - * @return row size - */ - -int64_t tsdbGetNumOfRowsInMemTable(tsdbReaderT* pHandle); - -/** - * move to next block if exists - * - * @param pTsdbReadHandle - * @return - */ -bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); - -/** - * Get current data block information - * - * @param pTsdbReadHandle - * @param pBlockInfo - * @return - */ -void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); - -/** - * - * Get the pre-calculated information w.r.t. current data block. - * - * In case of data block in cache, the pBlockStatis will always be NULL. - * If a block is not completed loaded from disk, the pBlockStatis will be NULL. - - * @pBlockStatis the pre-calculated value for current data blocks. if the block is a cache block, always return 0 - * @return - */ -int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SDataStatis **pBlockStatis); - -/** - * - * The query condition with primary timestamp is passed to iterator during its constructor function, - * the returned data block must be satisfied with the time window condition in any cases, - * which means the SData data block is not actually the completed disk data blocks. - * - * @param pTsdbReadHandle query handle - * @param pColumnIdList required data columns id list - * @return - */ -SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); - -/** - * destroy the created table group list, which is generated by tag query - * @param pGroupList - */ -void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); - -/** - * create the table group result including only one table, used to handle the normal table query - * - * @param tsdb tsdbHandle - * @param uid table uid - * @param pGroupInfo the generated result - * @return - */ -int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); - -/** - * - * @param tsdb - * @param pTableIdList - * @param pGroupInfo - * @return - */ -int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); - -/** - * clean up the query handle - * @param queryHandle - */ -void tsdbCleanupReadHandle(tsdbReaderT queryHandle); - -int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); - - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/tsdb/tsdbOptions.c b/source/dnode/vnode/src/tsdb/tsdbOptions.c index da7a1d393f..2c57a7406e 100644 --- a/source/dnode/vnode/src/tsdb/tsdbOptions.c +++ b/source/dnode/vnode/src/tsdb/tsdbOptions.c @@ -26,15 +26,6 @@ const STsdbCfg defautlTsdbOptions = {.precision = 0, .update = 0, .compression = TWO_STAGE_COMP}; -int tsdbOptionsInit(STsdbCfg *pTsdbOptions) { - // TODO - return 0; -} - -void tsdbOptionsClear(STsdbCfg *pTsdbOptions) { - // TODO -} - int tsdbValidateOptions(const STsdbCfg *pTsdbOptions) { // TODO return 0; diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 4a70738e86..ef417cabc6 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -18,13 +18,6 @@ const SVnodeCfg defaultVnodeOptions = { .wsize = 96 * 1024 * 1024, .ssize = 1 * 1024 * 1024, .lsize = 1024, .walCfg = {.level = TAOS_WAL_WRITE}}; /* TODO */ -void vnodeOptionsInit(SVnodeCfg *pVnodeOptions) { /* TODO */ - vnodeOptionsCopy(pVnodeOptions, &defaultVnodeOptions); -} - -void vnodeOptionsClear(SVnodeCfg *pVnodeOptions) { /* TODO */ -} - int vnodeValidateOptions(const SVnodeCfg *pVnodeOptions) { // TODO return 0; @@ -36,14 +29,14 @@ void vnodeOptionsCopy(SVnodeCfg *pDest, const SVnodeCfg *pSrc) { int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { uint32_t hashValue = 0; - + switch (pVnodeOptions->hashMethod) { default: hashValue = MurmurHash3_32(tableFName, strlen(tableFName)); break; } - // TODO OPEN THIS !!!!!!! + // TODO OPEN THIS !!!!!!! #if 0 if (hashValue < pVnodeOptions->hashBegin || hashValue > pVnodeOptions->hashEnd) { terrno = TSDB_CODE_VND_HASH_MISMATCH; @@ -53,5 +46,3 @@ int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName) { return TSDB_CODE_SUCCESS; } - - From 272d0ec74adefc02b963d734a156b5b1b1215292 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 02:10:56 +0000 Subject: [PATCH 2/4] refact vnode --- source/dnode/vnode/inc/vnode.h | 193 +++++++++++------------------ source/dnode/vnode/src/tq/tqRead.c | 40 +++++- 2 files changed, 113 insertions(+), 120 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 40be3f1299..80fcde0f70 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -35,21 +35,13 @@ extern "C" { #endif -// types -typedef struct SVnode SVnode; -typedef struct SMetaCfg SMetaCfg; // todo: remove -typedef struct STsdbCfg STsdbCfg; // todo: remove -typedef struct STqCfg STqCfg; // todo: remove -typedef struct SVnodeCfg SVnodeCfg; -typedef struct STqReadHandle STqReadHandle; -typedef struct SMTbCursor SMTbCursor; // todo: remove -typedef struct SMCtbCursor SMCtbCursor; // todo: remove -typedef struct SMSmaCursor SMSmaCursor; // todo: remove -typedef void *tsdbReaderT; -typedef struct STsdbQueryCond STsdbQueryCond; -typedef struct SDataStatis SDataStatis; - // vnode +typedef struct SVnode SVnode; +typedef struct SMetaCfg SMetaCfg; // todo: remove +typedef struct STsdbCfg STsdbCfg; // todo: remove +typedef struct STqCfg STqCfg; // todo: remove +typedef struct SVnodeCfg SVnodeCfg; + int vnodeInit(); void vnodeCleanup(); SVnode *vnodeOpen(const char *path, const SVnodeCfg *pVnodeCfg); @@ -65,12 +57,59 @@ int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg); int32_t vnodeCompact(SVnode *pVnode); int32_t vnodeSync(SVnode *pVnode); int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); +int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); // meta -typedef struct SMeta SMeta; // todo: remove +typedef struct SMeta SMeta; // todo: remove +typedef struct SMTbCursor SMTbCursor; // todo: remove +typedef struct SMCtbCursor SMCtbCursor; // todo: remove +typedef struct SMSmaCursor SMSmaCursor; // todo: remove + +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_NORMAL_TABLE TD_NORMAL_TABLE + +typedef SVCreateTbReq STbCfg; +typedef SVCreateTSmaReq SSmaCfg; + +SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); +void metaClose(SMeta *pMeta); +void metaRemove(const char *path); +int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); +int metaDropTable(SMeta *pMeta, tb_uid_t uid); +int metaCommit(SMeta *pMeta); +int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); +int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid); +STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); +STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); +SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); +STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); +void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); +STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); +SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup); +int metaGetTbNum(SMeta *pMeta); +SMTbCursor *metaOpenTbCursor(SMeta *pMeta); +void metaCloseTbCursor(SMTbCursor *pTbCur); +char *metaTbCursorNext(SMTbCursor *pTbCur); +SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); +void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); +tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); + +SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid); +void metaCloseSmaCursor(SMSmaCursor *pSmaCur); +int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur); // tsdb -typedef struct STsdb STsdb; +typedef struct STsdb STsdb; +typedef struct SDataStatis SDataStatis; +typedef struct STsdbQueryCond STsdbQueryCond; +typedef void *tsdbReaderT; + +#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 +#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 +#define BLOCK_LOAD_TABLE_RR_ORDER 3 +#define TABLE_TID(t) (t)->tid +#define TABLE_UID(t) (t)->uid tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); @@ -92,6 +131,23 @@ int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STab void tsdbCleanupReadHandle(tsdbReaderT queryHandle); // tq +enum { + TQ_STREAM_TOKEN__DATA = 1, + TQ_STREAM_TOKEN__WATERMARK, + TQ_STREAM_TOKEN__CHECKPOINT, +}; + +typedef struct STqReadHandle STqReadHandle; + +STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); + +void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList); +int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList); +int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); +bool tqNextDataBlock(STqReadHandle *pHandle); +int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); +SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); // need to remove int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); @@ -159,16 +215,9 @@ struct STqReadHandle { // ---------------------------- OLD ---------------------------- typedef struct SMgmtWrapper SMgmtWrapper; -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE - // Types exported -typedef SVCreateTbReq STbCfg; -typedef SVCreateTSmaReq SSmaCfg; - -typedef struct SDataStatis { +struct SDataStatis { int16_t colId; int16_t maxIndex; int16_t minIndex; @@ -176,7 +225,7 @@ typedef struct SDataStatis { int64_t sum; int64_t max; int64_t min; -} SDataStatis; +}; struct STsdbQueryCond { STimeWindow twindow; @@ -198,24 +247,8 @@ typedef struct STable { STSchema *pSchema; } STable; -#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 -#define BLOCK_LOAD_TABLE_SEQ_ORDER 2 -#define BLOCK_LOAD_TABLE_RR_ORDER 3 - -#define TABLE_TID(t) (t)->tid -#define TABLE_UID(t) (t)->uid - -/* ------------------------ SVnode ------------------------ */ -int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); - /* ------------------------ FOR COMPILE ------------------------ */ -enum { - TQ_STREAM_TOKEN__DATA = 1, - TQ_STREAM_TOKEN__WATERMARK, - TQ_STREAM_TOKEN__CHECKPOINT, -}; - typedef struct { int8_t type; int8_t reserved[7]; @@ -226,85 +259,7 @@ typedef struct { }; } STqStreamToken; -STqReadHandle *tqInitSubmitMsgScanner(SMeta *pMeta); - -static FORCE_INLINE void tqReadHandleSetColIdList(STqReadHandle *pReadHandle, SArray *pColIdList) { - pReadHandle->pColIdList = pColIdList; -} - -// static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, int64_t tbUid) { -// pHandle->tbUid = tbUid; -//} - -static FORCE_INLINE int tqReadHandleSetTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { - if (pHandle->tbIdHash) { - taosHashClear(pHandle->tbIdHash); - } - - pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pHandle->tbIdHash == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - - for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); - taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); - } - - return 0; -} - -static FORCE_INLINE int tqReadHandleAddTbUidList(STqReadHandle *pHandle, const SArray *tbUidList) { - if (pHandle->tbIdHash == NULL) { - pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); - if (pHandle->tbIdHash == NULL) { - terrno = TSDB_CODE_OUT_OF_MEMORY; - return -1; - } - } - - for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { - int64_t *pKey = (int64_t *)taosArrayGet(tbUidList, i); - taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); - } - - return 0; -} - -int32_t tqReadHandleSetMsg(STqReadHandle *pHandle, SSubmitReq *pMsg, int64_t ver); -bool tqNextDataBlock(STqReadHandle *pHandle); -int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); -// return SArray -SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); - // meta.h -SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); -void metaClose(SMeta *pMeta); -void metaRemove(const char *path); -int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); -int metaDropTable(SMeta *pMeta, tb_uid_t uid); -int metaCommit(SMeta *pMeta); -int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); -int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid); -STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); -STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); -SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); -void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); -STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); -SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup); -int metaGetTbNum(SMeta *pMeta); -SMTbCursor *metaOpenTbCursor(SMeta *pMeta); -void metaCloseTbCursor(SMTbCursor *pTbCur); -char *metaTbCursorNext(SMTbCursor *pTbCur); -SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); -void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); -tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); - -SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid); -void metaCloseSmaCursor(SMSmaCursor *pSmaCur); -int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur); // Options void metaOptionsInit(SMetaCfg *pMetaCfg); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index c69f8ce09a..21dc6e35e8 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -88,7 +88,7 @@ int tqRetrieveDataBlockInfo(STqReadHandle* pHandle, SDataBlockInfo* pBlockInfo) pBlockInfo->numOfCols = taosArrayGetSize(pHandle->pColIdList); pBlockInfo->rows = pHandle->pBlock->numOfRows; -// pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData. + // pBlockInfo->uid = pHandle->pBlock->uid; // the uid can not be assigned to pBlockData. return 0; } @@ -177,3 +177,41 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) { } return pArray; } + +void tqReadHandleSetColIdList(STqReadHandle* pReadHandle, SArray* pColIdList) { pReadHandle->pColIdList = pColIdList; } + +int tqReadHandleSetTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { + if (pHandle->tbIdHash) { + taosHashClear(pHandle->tbIdHash); + } + + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + } + + return 0; +} + +int tqReadHandleAddTbUidList(STqReadHandle* pHandle, const SArray* tbUidList) { + if (pHandle->tbIdHash == NULL) { + pHandle->tbIdHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_NO_LOCK); + if (pHandle->tbIdHash == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + } + + for (int i = 0; i < taosArrayGetSize(tbUidList); i++) { + int64_t* pKey = (int64_t*)taosArrayGet(tbUidList, i); + taosHashPut(pHandle->tbIdHash, pKey, sizeof(int64_t), NULL, 0); + } + + return 0; +} From 10b8756c6bdabd45e598359dfefe0f47bcb921f2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 02:13:47 +0000 Subject: [PATCH 3/4] refact vnode --- source/dnode/vnode/inc/vnode.h | 83 ++++++--------------------- source/dnode/vnode/src/meta/metaCfg.c | 5 -- 2 files changed, 17 insertions(+), 71 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 80fcde0f70..29ed075030 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -110,7 +110,16 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_TABLE_RR_ORDER 3 #define TABLE_TID(t) (t)->tid #define TABLE_UID(t) (t)->uid - +STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, + STfs *pTfs); +void tsdbClose(STsdb *); +void tsdbRemove(const char *path); +int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); +int tsdbPrepareCommit(STsdb *pTsdb); +int tsdbCommit(STsdb *pTsdb); +int32_t tsdbInitSma(STsdb *pTsdb); +int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); +int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, @@ -129,6 +138,10 @@ void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); +int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); +int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); +int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); // tq enum { @@ -149,7 +162,9 @@ bool tqNextDataBlock(STqReadHandle *pHandle); int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockInfo); SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); -// need to remove +// need to reposition +typedef struct SMgmtWrapper SMgmtWrapper; + int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); // structs @@ -212,11 +227,6 @@ struct STqReadHandle { STSchema *pSchema; }; -// ---------------------------- OLD ---------------------------- -typedef struct SMgmtWrapper SMgmtWrapper; - -// Types exported - struct SDataStatis { int16_t colId; int16_t maxIndex; @@ -247,8 +257,6 @@ typedef struct STable { STSchema *pSchema; } STable; -/* ------------------------ FOR COMPILE ------------------------ */ - typedef struct { int8_t type; int8_t reserved[7]; @@ -259,63 +267,6 @@ typedef struct { }; } STqStreamToken; -// meta.h - -// Options -void metaOptionsInit(SMetaCfg *pMetaCfg); -void metaOptionsClear(SMetaCfg *pMetaCfg); - -// query condition to build multi-table data block iterator -// STsdb -STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, - STfs *pTfs); -void tsdbClose(STsdb *); -void tsdbRemove(const char *path); -int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); -int tsdbPrepareCommit(STsdb *pTsdb); -int tsdbCommit(STsdb *pTsdb); - -int32_t tsdbInitSma(STsdb *pTsdb); -int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); -int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); -/** - * @brief When submit msg received, update the relative expired window synchronously. - * - * @param pTsdb - * @param pMsg - * @param version - * @return int32_t - */ -int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); - -/** - * @brief Insert tSma(Time-range-wise SMA) data from stream computing engine - * - * @param pTsdb - * @param indexUid - * @param msg - * @return int32_t - */ -int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); - -/** - * @brief Drop tSma data and local cache. - * - * @param pTsdb - * @param indexUid - * @return int32_t - */ -int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); - -/** - * @brief Insert RSma(Rollup SMA) data. - * - * @param pTsdb - * @param msg - * @return int32_t - */ -int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/meta/metaCfg.c b/source/dnode/vnode/src/meta/metaCfg.c index a5fcb32698..371c20f157 100644 --- a/source/dnode/vnode/src/meta/metaCfg.c +++ b/source/dnode/vnode/src/meta/metaCfg.c @@ -18,11 +18,6 @@ const SMetaCfg defaultMetaOptions = {.lruSize = 0}; /* ------------------------ EXPOSED METHODS ------------------------ */ -void metaOptionsInit(SMetaCfg *pMetaOptions) { metaOptionsCopy(pMetaOptions, &defaultMetaOptions); } - -void metaOptionsClear(SMetaCfg *pMetaOptions) { - // TODO -} int metaValidateOptions(const SMetaCfg *pMetaOptions) { // TODO From bb6806b59e026c42614611dbb5402756d2358230 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 02:25:49 +0000 Subject: [PATCH 4/4] refact vnode --- source/dnode/vnode/inc/vnode.h | 10 ---- source/dnode/vnode/src/inc/meta.h | 11 ++++ source/dnode/vnode/src/inc/vnodeInt.h | 6 +- source/dnode/vnode/src/meta/metaBDBImpl.c | 21 +++---- source/dnode/vnode/src/meta/metaQuery.c | 4 +- source/dnode/vnode/src/meta/metaTDBImpl.c | 70 +++++++++++------------ source/dnode/vnode/src/meta/metaTbCfg.c | 1 - source/dnode/vnode/src/meta/metaTbTag.c | 4 +- source/dnode/vnode/src/tq/tq.c | 3 - source/dnode/vnode/src/tq/tqCommit.c | 2 + source/dnode/vnode/src/tq/tqMetaStore.c | 10 ++-- source/dnode/vnode/src/tq/tqRead.c | 3 +- source/dnode/vnode/src/tsdb/tsdbBDBImpl.c | 6 +- source/dnode/vnode/src/tsdb/tsdbFS.c | 50 ++++++++-------- source/dnode/vnode/src/tsdb/tsdbRead.c | 12 ---- source/dnode/vnode/src/vnd/vnodeInt.c | 1 - source/dnode/vnode/src/vnd/vnodeMgr.c | 1 - source/dnode/vnode/src/vnd/vnodeQuery.c | 1 - source/dnode/vnode/src/vnd/vnodeWrite.c | 16 +++--- 19 files changed, 107 insertions(+), 125 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 29ed075030..6e85c4df64 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -72,16 +72,6 @@ typedef struct SMSmaCursor SMSmaCursor; // todo: remove typedef SVCreateTbReq STbCfg; typedef SVCreateTSmaReq SSmaCfg; -SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF); -void metaClose(SMeta *pMeta); -void metaRemove(const char *path); -int metaCreateTable(SMeta *pMeta, STbCfg *pTbCfg); -int metaDropTable(SMeta *pMeta, tb_uid_t uid); -int metaCommit(SMeta *pMeta); -int32_t metaCreateTSma(SMeta *pMeta, SSmaCfg *pCfg); -int32_t metaDropTSma(SMeta *pMeta, int64_t indexUid); -STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid); -STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid); SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index b04364daf8..b3553f5d2d 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -24,6 +24,17 @@ typedef struct SMetaCache SMetaCache; typedef struct SMetaIdx SMetaIdx; typedef struct SMetaDB SMetaDB; +SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); +void metaClose(SMeta* pMeta); +void metaRemove(const char* path); +int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); +int metaDropTable(SMeta* pMeta, tb_uid_t uid); +int metaCommit(SMeta* pMeta); +int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); +int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); +STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid); +STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid); + // SMetaDB int metaOpenDB(SMeta* pMeta); void metaCloseDB(SMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 29a45723c4..7168493666 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -17,6 +17,9 @@ #define _TD_VNODE_DEF_H_ #include "executor.h" +#include "filter.h" +#include "qworker.h" +#include "sync.h" #include "tchecksum.h" #include "tcoding.h" #include "tcompression.h" @@ -25,14 +28,15 @@ #include "tglobal.h" #include "tlist.h" #include "tlockfree.h" +#include "tlosertree.h" #include "tmacro.h" #include "tmallocator.h" #include "tskiplist.h" +#include "tstream.h" #include "ttime.h" #include "ttimer.h" #include "vnode.h" #include "wal.h" -#include "qworker.h" #ifdef __cplusplus extern "C" { diff --git a/source/dnode/vnode/src/meta/metaBDBImpl.c b/source/dnode/vnode/src/meta/metaBDBImpl.c index a8270f746d..249e489029 100644 --- a/source/dnode/vnode/src/meta/metaBDBImpl.c +++ b/source/dnode/vnode/src/meta/metaBDBImpl.c @@ -16,10 +16,7 @@ #define ALLOW_FORBID_FUNC #include "db.h" -#include "metaDef.h" - -#include "tcoding.h" -#include "thash.h" +#include "vnodeInt.h" #define IMPL_WITH_LOCK 1 // #if IMPL_WITH_LOCK @@ -262,7 +259,7 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { return 0; } -int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) { +int metaRemoveSmaFromDb(SMeta *pMeta, int64_t indexUid) { // TODO #if 0 DBT key = {0}; @@ -668,7 +665,7 @@ STbCfg *metaGetTbInfoByName(SMeta *pMeta, char *tbname, tb_uid_t *uid) { } void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode) { - STSma * pCfg = NULL; + STSma *pCfg = NULL; SMetaDB *pDB = pMeta->pDB; DBT key = {0}; DBT value = {0}; @@ -711,9 +708,9 @@ static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_ int ret; void *pBuf; // SSchema *pSchema; - SSchemaKey schemaKey = {uid, sver, 0}; - DBT key = {0}; - DBT value = {0}; + SSchemaKey schemaKey = {uid, sver, 0}; + DBT key = {0}; + DBT value = {0}; // Set key/value properties key.data = &schemaKey; @@ -761,14 +758,14 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { } int metaGetTbNum(SMeta *pMeta) { - SMetaDB *pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; DB_BTREE_STAT *sp1; pDB->pTbDB->stat(pDB->pNtbIdx, NULL, &sp1, 0); - + DB_BTREE_STAT *sp2; pDB->pTbDB->stat(pDB->pCtbIdx, NULL, &sp2, 0); - + return sp1->bt_nkeys + sp2->bt_nkeys; } diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 6dea4a4e57..5022d0e050 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "vnodeInt.h" \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaTDBImpl.c b/source/dnode/vnode/src/meta/metaTDBImpl.c index 6f218ad72b..9b9f54b5ba 100644 --- a/source/dnode/vnode/src/meta/metaTDBImpl.c +++ b/source/dnode/vnode/src/meta/metaTDBImpl.c @@ -15,7 +15,6 @@ #include "vnodeInt.h" -#include "tdbInt.h" typedef struct SPoolMem { int64_t size; struct SPoolMem *prev; @@ -27,18 +26,18 @@ typedef struct SPoolMem { static SPoolMem *openPool(); static void clearPool(SPoolMem *pPool); static void closePool(SPoolMem *pPool); -static void * poolMalloc(void *arg, size_t size); +static void *poolMalloc(void *arg, size_t size); static void poolFree(void *arg, void *ptr); struct SMetaDB { TXN txn; - TENV * pEnv; - TDB * pTbDB; - TDB * pSchemaDB; - TDB * pNameIdx; - TDB * pStbIdx; - TDB * pNtbIdx; - TDB * pCtbIdx; + TENV *pEnv; + TDB *pTbDB; + TDB *pSchemaDB; + TDB *pNameIdx; + TDB *pStbIdx; + TDB *pNtbIdx; + TDB *pCtbIdx; SPoolMem *pPool; #ifdef META_TDB_SMA_TEST TDB *pSmaDB; @@ -52,7 +51,7 @@ typedef struct __attribute__((__packed__)) { } SSchemaDbKey; typedef struct { - char * name; + char *name; tb_uid_t uid; } SNameIdxKey; @@ -251,14 +250,14 @@ void metaCloseDB(SMeta *pMeta) { int metaSaveTableToDB(SMeta *pMeta, STbCfg *pTbCfg) { tb_uid_t uid; - SMetaDB * pMetaDb; - void * pKey; - void * pVal; + SMetaDB *pMetaDb; + void *pKey; + void *pVal; int kLen; int vLen; int ret; char buf[512]; - void * pBuf; + void *pBuf; SCtbIdxKey ctbIdxKey; SSchemaDbKey schemaDbKey; SSchemaWrapper schemaWrapper; @@ -375,11 +374,11 @@ int metaRemoveTableFromDb(SMeta *pMeta, tb_uid_t uid) { STbCfg *metaGetTbInfoByUid(SMeta *pMeta, tb_uid_t uid) { int ret; SMetaDB *pMetaDb = pMeta->pDB; - void * pKey; - void * pVal; + void *pKey; + void *pVal; int kLen; int vLen; - STbCfg * pTbCfg; + STbCfg *pTbCfg; // Fetch pKey = &uid; @@ -431,14 +430,14 @@ SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, boo } static SSchemaWrapper *metaGetTableSchemaImpl(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline, bool isGetEx) { - void * pKey; - void * pVal; + void *pKey; + void *pVal; int kLen; int vLen; int ret; SSchemaDbKey schemaDbKey; SSchemaWrapper *pSchemaWrapper; - void * pBuf; + void *pBuf; // fetch schemaDbKey.uid = uid; @@ -465,9 +464,9 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { tb_uid_t quid; SSchemaWrapper *pSW; STSchemaBuilder sb; - SSchemaEx * pSchema; - STSchema * pTSchema; - STbCfg * pTbCfg; + SSchemaEx *pSchema; + STSchema *pTSchema; + STbCfg *pTbCfg; pTbCfg = metaGetTbInfoByUid(pMeta, uid); if (pTbCfg->type == META_CHILD_TABLE) { @@ -498,7 +497,7 @@ struct SMTbCursor { SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { SMTbCursor *pTbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; pTbCur = (SMTbCursor *)taosMemoryCalloc(1, sizeof(*pTbCur)); if (pTbCur == NULL) { @@ -520,12 +519,12 @@ void metaCloseTbCursor(SMTbCursor *pTbCur) { } char *metaTbCursorNext(SMTbCursor *pTbCur) { - void * pKey = NULL; - void * pVal = NULL; + void *pKey = NULL; + void *pVal = NULL; int kLen; int vLen; int ret; - void * pBuf; + void *pBuf; STbCfg tbCfg; for (;;) { @@ -548,17 +547,17 @@ char *metaTbCursorNext(SMTbCursor *pTbCur) { } struct SMCtbCursor { - TDBC * pCur; + TDBC *pCur; tb_uid_t suid; - void * pKey; - void * pVal; + void *pKey; + void *pVal; int kLen; int vLen; }; SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid) { SMCtbCursor *pCtbCur = NULL; - SMetaDB * pDB = pMeta->pDB; + SMetaDB *pDB = pMeta->pDB; int ret; pCtbCur = (SMCtbCursor *)taosMemoryCalloc(1, sizeof(*pCtbCur)); @@ -654,7 +653,6 @@ STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid) { continue; } - ++pSW->number; STSma *tptr = (STSma *)taosMemoryRealloc(pSW->tSma, pSW->number * sizeof(STSma)); if (tptr == NULL) { @@ -709,10 +707,10 @@ int metaSaveSmaToDB(SMeta *pMeta, STSma *pSmaCfg) { // ASSERT(0); #ifdef META_TDB_SMA_TEST - int32_t ret = 0; + int32_t ret = 0; SMetaDB *pMetaDb = pMeta->pDB; - void *pBuf = NULL, *qBuf = NULL; - void *key = {0}, *val = {0}; + void *pBuf = NULL, *qBuf = NULL; + void *key = {0}, *val = {0}; // save sma info int32_t len = tEncodeTSma(NULL, pSmaCfg); @@ -1103,7 +1101,7 @@ static void closePool(SPoolMem *pPool) { } static void *poolMalloc(void *arg, size_t size) { - void * ptr = NULL; + void *ptr = NULL; SPoolMem *pPool = (SPoolMem *)arg; SPoolMem *pMem; diff --git a/source/dnode/vnode/src/meta/metaTbCfg.c b/source/dnode/vnode/src/meta/metaTbCfg.c index 8ecc808786..9d5012c17f 100644 --- a/source/dnode/vnode/src/meta/metaTbCfg.c +++ b/source/dnode/vnode/src/meta/metaTbCfg.c @@ -14,7 +14,6 @@ */ #include "vnodeInt.h" -#include "tcoding.h" int metaValidateTbCfg(SMeta *pMeta, const STbCfg *pTbOptions) { // TODO diff --git a/source/dnode/vnode/src/meta/metaTbTag.c b/source/dnode/vnode/src/meta/metaTbTag.c index 6dea4a4e57..5022d0e050 100644 --- a/source/dnode/vnode/src/meta/metaTbTag.c +++ b/source/dnode/vnode/src/meta/metaTbTag.c @@ -11,4 +11,6 @@ * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . - */ \ No newline at end of file + */ + +#include "vnodeInt.h" \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index f48262deba..f57103aab4 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -13,9 +13,6 @@ * along with this program. If not, see . */ -#include "tcompare.h" -#include "tdatablock.h" -#include "tstream.h" #include "vnodeInt.h" int32_t tqInit() { return tqPushMgrInit(); } diff --git a/source/dnode/vnode/src/tq/tqCommit.c b/source/dnode/vnode/src/tq/tqCommit.c index f2f48bbc8a..8e59243a9c 100644 --- a/source/dnode/vnode/src/tq/tqCommit.c +++ b/source/dnode/vnode/src/tq/tqCommit.c @@ -12,3 +12,5 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ + +#include "vnodeInt.h" diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c index 84f12f93c6..357917e0ba 100644 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ b/source/dnode/vnode/src/tq/tqMetaStore.c @@ -14,13 +14,13 @@ */ #include "vnodeInt.h" // TODO:replace by an abstract file layer -#include -#include -#include -#include "osDir.h" +// #include +// #include +// #include +// #include "osDir.h" #define TQ_META_NAME "tq.meta" -#define TQ_IDX_NAME "tq.idx" +#define TQ_IDX_NAME "tq.idx" static int32_t tqHandlePutCommitted(STqMetaStore*, int64_t key, void* value); static void* tqHandleGetUncommitted(STqMetaStore*, int64_t key); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 21dc6e35e8..9282f7197e 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "tdatablock.h" -#include "vnode.h" +#include "vnodeInt.h" STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle)); diff --git a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c index 0deef2e4c9..acd9c2dcaa 100644 --- a/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbBDBImpl.c @@ -16,9 +16,7 @@ #define ALLOW_FORBID_FUNC #include "db.h" -#include "taoserror.h" -#include "tcoding.h" -#include "thash.h" +#include "vnodeInt.h" #define IMPL_WITH_LOCK 1 @@ -139,7 +137,7 @@ int32_t tsdbSaveSmaToDB(SDBFile *pDBF, void *key, uint32_t keySize, void *data, return 0; } -void *tsdbGetSmaDataByKey(SDBFile *pDBF, void* key, uint32_t keySize, uint32_t *valueSize) { +void *tsdbGetSmaDataByKey(SDBFile *pDBF, void *key, uint32_t keySize, uint32_t *valueSize) { void *result = NULL; DBT key1 = {0}; DBT value1 = {0}; diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index eff350ddda..bd3888864d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -13,9 +13,7 @@ * along with this program. If not, see . */ -#include #include "vnodeInt.h" -#include "os.h" typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; static const char *tsdbTxnFname[] = {"current.t", "current"}; @@ -97,8 +95,8 @@ static int tsdbEncodeDFileSetArray(void **buf, SArray *pArray) { return tlen; } -static void *tsdbDecodeDFileSetArray(STsdb*pRepo, void *buf, SArray *pArray) { - uint64_t nset = 0; +static void *tsdbDecodeDFileSetArray(STsdb *pRepo, void *buf, SArray *pArray) { + uint64_t nset = 0; taosArrayClear(pArray); @@ -122,7 +120,7 @@ static int tsdbEncodeFSStatus(void **buf, SFSStatus *pStatus) { return tlen; } -static void *tsdbDecodeFSStatus(STsdb*pRepo, void *buf, SFSStatus *pStatus) { +static void *tsdbDecodeFSStatus(STsdb *pRepo, void *buf, SFSStatus *pStatus) { tsdbResetFSStatus(pStatus); // pStatus->pmf = &(pStatus->mf); @@ -407,8 +405,8 @@ int tsdbUpdateDFileSet(STsdbFS *pfs, const SDFileSet *pSet) { return tsdbAddDFil static int tsdbSaveFSStatus(STsdb *pRepo, SFSStatus *pStatus) { SFSHeader fsheader; - void * pBuf = NULL; - void * ptr; + void *pBuf = NULL; + void *ptr; char hbuf[TSDB_FILE_HEAD_SIZE] = "\0"; char tfname[TSDB_FILENAME_LEN] = "\0"; char cfname[TSDB_FILENAME_LEN] = "\0"; @@ -592,7 +590,7 @@ void tsdbFSIterSeek(SFSIter *pIter, int fid) { } SDFileSet *tsdbFSIterNext(SFSIter *pIter) { - STsdbFS * pfs = pIter->pfs; + STsdbFS *pfs = pIter->pfs; SDFileSet *pSet; if (pIter->index < 0) { @@ -651,12 +649,12 @@ static void tsdbGetTxnFname(STsdb *pRepo, TSDB_TXN_FILE_T ftype, char fname[]) { } static int tsdbOpenFSFromCurrent(STsdb *pRepo) { - STsdbFS * pfs = REPO_FS(pRepo); + STsdbFS *pfs = REPO_FS(pRepo); TdFilePtr pFile = NULL; - void * buffer = NULL; + void *buffer = NULL; SFSHeader fsheader; char current[TSDB_FILENAME_LEN] = "\0"; - void * ptr; + void *ptr; tsdbGetTxnFname(pRepo, TSDB_TXN_CURR_FILE, current); @@ -746,7 +744,7 @@ _err: // Scan and try to fix incorrect files static int tsdbScanAndTryFixFS(STsdb *pRepo) { - STsdbFS * pfs = REPO_FS(pRepo); + STsdbFS *pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; // if (tsdbScanAndTryFixMFile(pRepo) < 0) { @@ -908,9 +906,9 @@ static int tsdbScanAndTryFixFS(STsdb *pRepo) { // } static int tsdbScanRootDir(STsdb *pRepo) { - char rootDir[TSDB_FILENAME_LEN]; - char bname[TSDB_FILENAME_LEN]; - STsdbFS * pfs = REPO_FS(pRepo); + char rootDir[TSDB_FILENAME_LEN]; + char bname[TSDB_FILENAME_LEN]; + STsdbFS *pfs = REPO_FS(pRepo); const STfsFile *pf; tsdbGetRootDir(REPO_ID(pRepo), rootDir); @@ -942,9 +940,9 @@ static int tsdbScanRootDir(STsdb *pRepo) { } static int tsdbScanDataDir(STsdb *pRepo) { - char dataDir[TSDB_FILENAME_LEN]; - char bname[TSDB_FILENAME_LEN]; - STsdbFS * pfs = REPO_FS(pRepo); + char dataDir[TSDB_FILENAME_LEN]; + char bname[TSDB_FILENAME_LEN]; + STsdbFS *pfs = REPO_FS(pRepo); const STfsFile *pf; tsdbGetDataDir(REPO_ID(pRepo), dataDir); @@ -1107,14 +1105,14 @@ static bool tsdbIsTFileInFS(STsdbFS *pfs, const STfsFile *pf) { // } static int tsdbRestoreDFileSet(STsdb *pRepo) { - char dataDir[TSDB_FILENAME_LEN]; - char bname[TSDB_FILENAME_LEN]; - STfsDir * tdir = NULL; + char dataDir[TSDB_FILENAME_LEN]; + char bname[TSDB_FILENAME_LEN]; + STfsDir *tdir = NULL; const STfsFile *pf = NULL; - const char * pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|smad|smal)(-ver[0-9]+)?$"; - SArray * fArray = NULL; - regex_t regex; - STsdbFS * pfs = REPO_FS(pRepo); + const char *pattern = "^v[0-9]+f[0-9]+\\.(head|data|last|smad|smal)(-ver[0-9]+)?$"; + SArray *fArray = NULL; + regex_t regex; + STsdbFS *pfs = REPO_FS(pRepo); tsdbGetDataDir(REPO_ID(pRepo), dataDir); @@ -1327,7 +1325,7 @@ static int tsdbComparTFILE(const void *arg1, const void *arg2) { } static void tsdbScanAndTryFixDFilesHeader(STsdb *pRepo, int32_t *nExpired) { - STsdbFS * pfs = REPO_FS(pRepo); + STsdbFS *pfs = REPO_FS(pRepo); SFSStatus *pStatus = pfs->cstatus; SDFInfo info; diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 550e7cd183..9509dfa462 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -13,18 +13,6 @@ * along with this program. If not, see . */ -#include "os.h" -#include "talgo.h" -#include "tcompare.h" -#include "tdatablock.h" -#include "tdataformat.h" -#include "texception.h" -#include "vnodeInt.h" - -#include "filter.h" -#include "taosdef.h" -#include "tlosertree.h" -#include "tmsg.h" #include "vnodeInt.h" #define EXTRA_BYTES 2 diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 24294c4b58..270dc377b9 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -14,7 +14,6 @@ */ #define _DEFAULT_SOURCE -#include "sync.h" #include "vnodeInt.h" // #include "vnodeInt.h" diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c index 40f43bcd12..df5e2ceffa 100644 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ b/source/dnode/vnode/src/vnd/vnodeMgr.c @@ -14,7 +14,6 @@ */ #include "vnodeInt.h" -#include "tglobal.h" SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index f56ded9f15..965dff59e1 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -13,7 +13,6 @@ * along with this program. If not, see . */ -#include "executor.h" #include "vnodeInt.h" static int32_t vnodeGetTableList(SVnode *pVnode, SRpcMsg *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeWrite.c b/source/dnode/vnode/src/vnd/vnodeWrite.c index 0449319dc2..24b5d4bae5 100644 --- a/source/dnode/vnode/src/vnd/vnodeWrite.c +++ b/source/dnode/vnode/src/vnd/vnodeWrite.c @@ -81,7 +81,7 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { // TODO: to encapsule a free API taosMemoryFree(vCreateTbReq.stbCfg.pSchema); taosMemoryFree(vCreateTbReq.stbCfg.pTagSchema); - if(vCreateTbReq.stbCfg.pRSmaParam) { + if (vCreateTbReq.stbCfg.pRSmaParam) { taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam->pFuncIds); taosMemoryFree(vCreateTbReq.stbCfg.pRSmaParam); } @@ -235,13 +235,13 @@ int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { if (tsdbCreateTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { // TODO } - // } break; - // case TDMT_VND_CANCEL_SMA: { // timeRangeSMA - // } break; - // case TDMT_VND_DROP_SMA: { // timeRangeSMA - // if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { - // // TODO - // } + // } break; + // case TDMT_VND_CANCEL_SMA: { // timeRangeSMA + // } break; + // case TDMT_VND_DROP_SMA: { // timeRangeSMA + // if (tsdbDropTSma(pVnode->pTsdb, POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead))) < 0) { + // // TODO + // } #if 0 tsdbTSmaSub(pVnode->pTsdb, 1); SVDropTSmaReq vDropSmaReq = {0};