From f80768efb323cc72200ed6d82b37ebaccbc5e2bd Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 02:53:10 +0000 Subject: [PATCH 1/8] refact vnode --- source/dnode/vnode/inc/vnode.h | 83 +--------- source/dnode/vnode/src/inc/meta.h | 54 +++++-- source/dnode/vnode/src/inc/tq.h | 81 +++++----- source/dnode/vnode/src/inc/tsdb.h | 251 ++++++++++++++++-------------- 4 files changed, 222 insertions(+), 247 deletions(-) diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 6e85c4df64..c7a8a2a83d 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -60,34 +60,15 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad); int vnodeValidateTableHash(SVnodeCfg *pVnodeOptions, char *tableFName); // meta -typedef struct SMeta SMeta; // todo: remove -typedef struct SMTbCursor SMTbCursor; // todo: remove -typedef struct SMCtbCursor SMCtbCursor; // todo: remove -typedef struct SMSmaCursor SMSmaCursor; // todo: remove - -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE +typedef struct SMeta SMeta; // todo: remove +typedef struct SMTbCursor SMTbCursor; typedef SVCreateTbReq STbCfg; typedef SVCreateTSmaReq SSmaCfg; -SSchemaWrapper *metaGetTableSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver); -void *metaGetSmaInfoByIndex(SMeta *pMeta, int64_t indexUid, bool isDecode); -STSmaWrapper *metaGetSmaInfoByTable(SMeta *pMeta, tb_uid_t uid); -SArray *metaGetSmaTbUids(SMeta *pMeta, bool isDup); -int metaGetTbNum(SMeta *pMeta); -SMTbCursor *metaOpenTbCursor(SMeta *pMeta); -void metaCloseTbCursor(SMTbCursor *pTbCur); -char *metaTbCursorNext(SMTbCursor *pTbCur); -SMCtbCursor *metaOpenCtbCursor(SMeta *pMeta, tb_uid_t uid); -void metaCloseCtbCurosr(SMCtbCursor *pCtbCur); -tb_uid_t metaCtbCursorNext(SMCtbCursor *pCtbCur); - -SMSmaCursor *metaOpenSmaCursor(SMeta *pMeta, tb_uid_t uid); -void metaCloseSmaCursor(SMSmaCursor *pSmaCur); -int64_t metaSmaCursorNext(SMSmaCursor *pSmaCur); +SMTbCursor *metaOpenTbCursor(SMeta *pMeta); +void metaCloseTbCursor(SMTbCursor *pTbCur); +char *metaTbCursorNext(SMTbCursor *pTbCur); // tsdb typedef struct STsdb STsdb; @@ -98,18 +79,7 @@ typedef void *tsdbReaderT; #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 #define BLOCK_LOAD_TABLE_RR_ORDER 3 -#define TABLE_TID(t) (t)->tid -#define TABLE_UID(t) (t)->uid -STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, - STfs *pTfs); -void tsdbClose(STsdb *); -void tsdbRemove(const char *path); -int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); -int tsdbPrepareCommit(STsdb *pTsdb); -int tsdbCommit(STsdb *pTsdb); -int32_t tsdbInitSma(STsdb *pTsdb); -int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); -int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); + tsdbReaderT *tsdbQueryTables(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *tableInfoGroup, uint64_t qId, uint64_t taskId); tsdbReaderT tsdbQueryCacheLast(STsdb *tsdb, STsdbQueryCond *pCond, STableGroupInfo *groupList, uint64_t qId, @@ -127,18 +97,8 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); -void tsdbCleanupReadHandle(tsdbReaderT queryHandle); -int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); -int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); -int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); -int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); // tq -enum { - TQ_STREAM_TOKEN__DATA = 1, - TQ_STREAM_TOKEN__WATERMARK, - TQ_STREAM_TOKEN__CHECKPOINT, -}; typedef struct STqReadHandle STqReadHandle; @@ -202,21 +162,6 @@ struct SVnodeCfg { int8_t hashMethod; }; -struct STqReadHandle { - int64_t ver; - int64_t tbUid; - SHashObj *tbIdHash; - const SSubmitReq *pMsg; - SSubmitBlk *pBlock; - SSubmitMsgIter msgIter; - SSubmitBlkIter blkIter; - SMeta *pVnodeMeta; - SArray *pColIdList; // SArray - int32_t sver; - SSchemaWrapper *pSchemaWrapper; - STSchema *pSchema; -}; - struct SDataStatis { int16_t colId; int16_t maxIndex; @@ -241,22 +186,6 @@ typedef struct { uint64_t uid; } STableKeyInfo; -typedef struct STable { - uint64_t tid; - uint64_t uid; - STSchema *pSchema; -} STable; - -typedef struct { - int8_t type; - int8_t reserved[7]; - union { - void *data; - int64_t wmTs; - int64_t checkpointId; - }; -} STqStreamToken; - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index b3553f5d2d..e21ef5893b 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -20,20 +20,48 @@ extern "C" { #endif -typedef struct SMetaCache SMetaCache; -typedef struct SMetaIdx SMetaIdx; -typedef struct SMetaDB SMetaDB; +typedef struct SMetaCache SMetaCache; +typedef struct SMetaIdx SMetaIdx; +typedef struct SMetaDB SMetaDB; +typedef struct SMCtbCursor SMCtbCursor; +typedef struct SMSmaCursor SMSmaCursor; -SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); -void metaClose(SMeta* pMeta); -void metaRemove(const char* path); -int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); -int metaDropTable(SMeta* pMeta, tb_uid_t uid); -int metaCommit(SMeta* pMeta); -int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); -int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); -STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid); -STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid); +// metaDebug ================== +// clang-format off +#define metaFatal(...) do { if (metaDebugFlag & DEBUG_FATAL) { taosPrintLog("META FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define metaError(...) do { if (metaDebugFlag & DEBUG_ERROR) { taosPrintLog("META ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define metaWarn(...) do { if (metaDebugFlag & DEBUG_WARN) { taosPrintLog("META WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define metaInfo(...) do { if (metaDebugFlag & DEBUG_INFO) { taosPrintLog("META ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define metaDebug(...) do { if (metaDebugFlag & DEBUG_DEBUG) { taosPrintLog("META ", DEBUG_DEBUG, metaDebugFlag, __VA_ARGS__); }} while(0) +#define metaTrace(...) do { if (metaDebugFlag & DEBUG_TRACE) { taosPrintLog("META ", DEBUG_TRACE, metaDebugFlag, __VA_ARGS__); }} while(0) +// clang-format on + +#define META_SUPER_TABLE TD_SUPER_TABLE +#define META_CHILD_TABLE TD_CHILD_TABLE +#define META_NORMAL_TABLE TD_NORMAL_TABLE + +SMeta* metaOpen(const char* path, const SMetaCfg* pMetaCfg, SMemAllocatorFactory* pMAF); +void metaClose(SMeta* pMeta); +void metaRemove(const char* path); +int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg); +int metaDropTable(SMeta* pMeta, tb_uid_t uid); +int metaCommit(SMeta* pMeta); +int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); +int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); +STbCfg* metaGetTbInfoByUid(SMeta* pMeta, tb_uid_t uid); +STbCfg* metaGetTbInfoByName(SMeta* pMeta, char* tbname, tb_uid_t* uid); +SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); +STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); +void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode); +STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid); +SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup); +int metaGetTbNum(SMeta* pMeta); +SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid); +void metaCloseSmaCursor(SMSmaCursor* pSmaCur); +int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); +SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid); +void metaCloseCtbCurosr(SMCtbCursor* pCtbCur); +tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); // SMetaDB int metaOpenDB(SMeta* pMeta); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index bee443f487..78e1b1ce03 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -20,48 +20,21 @@ extern "C" { #endif -// tqInt.h -#define tqFatal(...) \ - { \ - if (tqDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ - } \ - } +// tqDebug =================== +// clang-format off +#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0) +#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0) +// clang-format on -#define tqError(...) \ - { \ - if (tqDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ - } \ - } - -#define tqWarn(...) \ - { \ - if (tqDebugFlag & DEBUG_WARN) { \ - taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ - } \ - } - -#define tqInfo(...) \ - { \ - if (tqDebugFlag & DEBUG_INFO) { \ - taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \ - } \ - } - -#define tqDebug(...) \ - { \ - if (tqDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \ - } \ - } - -#define tqTrace(...) \ - { \ - if (tqDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \ - } \ - } +enum { + TQ_STREAM_TOKEN__DATA = 1, + TQ_STREAM_TOKEN__WATERMARK, + TQ_STREAM_TOKEN__CHECKPOINT, +}; #define TQ_BUFFER_SIZE 4 @@ -105,6 +78,31 @@ typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus; typedef struct STqOffsetCfg STqOffsetCfg; typedef struct STqOffsetStore STqOffsetStore; +struct STqReadHandle { + int64_t ver; + int64_t tbUid; + SHashObj* tbIdHash; + const SSubmitReq* pMsg; + SSubmitBlk* pBlock; + SSubmitMsgIter msgIter; + SSubmitBlkIter blkIter; + SMeta* pVnodeMeta; + SArray* pColIdList; // SArray + int32_t sver; + SSchemaWrapper* pSchemaWrapper; + STSchema* pSchema; +}; + +typedef struct { + int8_t type; + int8_t reserved[7]; + union { + void* data; + int64_t wmTs; + int64_t checkpointId; + }; +} STqStreamToken; + typedef struct { int16_t ver; int16_t action; @@ -248,7 +246,6 @@ typedef struct { static STqPushMgmt tqPushMgmt; - int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index b85caad4c8..63c0ad6cdd 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -20,10 +20,45 @@ extern "C" { #endif +// tsdbDebug ================ +// clang-format off +#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) +#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) +// clang-format on + typedef struct SSmaStat SSmaStat; typedef struct SSmaEnv SSmaEnv; typedef struct SSmaEnvs SSmaEnvs; +typedef struct STable { + uint64_t tid; + uint64_t uid; + STSchema *pSchema; +} STable; + +#define TABLE_TID(t) (t)->tid +#define TABLE_UID(t) (t)->uid + +STsdb *tsdbOpen(const char *path, int32_t vgId, const STsdbCfg *pTsdbCfg, SMemAllocatorFactory *pMAF, SMeta *pMeta, + STfs *pTfs); +void tsdbClose(STsdb *); +void tsdbRemove(const char *path); +int tsdbInsertData(STsdb *pTsdb, SSubmitReq *pMsg, SSubmitRsp *pRsp); +int tsdbPrepareCommit(STsdb *pTsdb); +int tsdbCommit(STsdb *pTsdb); +int32_t tsdbInitSma(STsdb *pTsdb); +int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); +int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); +int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); +int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); +int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); +int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); +void tsdbCleanupReadHandle(tsdbReaderT queryHandle); + typedef enum { TSDB_FILE_HEAD = 0, // .head TSDB_FILE_DATA, // .data @@ -93,7 +128,7 @@ typedef struct STsdbMemTable { SMemAllocator *pMA; // Container SSkipList *pSlIdx; // SSkiplist - SHashObj * pHashIdx; + SHashObj *pHashIdx; } STsdbMemTable; typedef struct { @@ -105,16 +140,16 @@ typedef struct { // ================== typedef struct { STsdbFSMeta meta; // FS meta - SArray * df; // data file array - SArray * sf; // sma data file array v2f1900.index_name_1 + SArray *df; // data file array + SArray *sf; // sma data file array v2f1900.index_name_1 } SFSStatus; typedef struct { TdThreadRwlock lock; SFSStatus *cstatus; // current status - SHashObj * metaCache; // meta cache - SHashObj * metaCacheComp; // meta cache for compact + SHashObj *metaCache; // meta cache + SHashObj *metaCacheComp; // meta cache for compact bool intxn; SFSStatus *nstatus; // new status } STsdbFS; @@ -123,15 +158,15 @@ struct STsdb { int32_t vgId; bool repoLocked; TdThreadMutex mutex; - char * path; + char *path; STsdbCfg config; - STsdbMemTable * mem; - STsdbMemTable * imem; + STsdbMemTable *mem; + STsdbMemTable *imem; SRtn rtn; SMemAllocatorFactory *pmaf; - STsdbFS * fs; - SMeta * pMeta; - STfs * pTfs; + STsdbFS *fs; + SMeta *pMeta; + STfs *pTfs; SSmaEnvs smaEnvs; }; @@ -153,16 +188,6 @@ static FORCE_INLINE STSchema *tsdbGetTableSchemaImpl(STable *pTable, bool lock, return pTable->pSchema; } -// tsdbLog -extern int32_t tsdbDebugFlag; - -#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TSDB FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) -#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TSDB ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) -#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TSDB WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) -#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TSDB ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) -#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TSDB ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); }} while(0) -#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0) - // tsdbMemTable.h typedef struct { int rowsInserted; @@ -174,10 +199,10 @@ typedef struct { TSKEY keyLast; } SMergeInfo; -static void * taosTMalloc(size_t size); -static void * taosTCalloc(size_t nmemb, size_t size); -static void * taosTRealloc(void *ptr, size_t size); -static void * taosTZfree(void *ptr); +static void *taosTMalloc(size_t size); +static void *taosTCalloc(size_t nmemb, size_t size); +static void *taosTRealloc(void *ptr, size_t size); +static void *taosTZfree(void *ptr); static size_t taosTSizeof(void *ptr); static void taosTMemset(void *ptr, int c); @@ -398,18 +423,18 @@ static FORCE_INLINE size_t tsdbBlockAggrSize(int nCols, uint32_t blkVer) { } } -int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); -void tsdbDestroyReadH(SReadH *pReadh); -int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); -void tsdbCloseAndUnsetFSet(SReadH *pReadh); -int tsdbLoadBlockIdx(SReadH *pReadh); -int tsdbSetReadTable(SReadH *pReadh, STable *pTable); -int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); -int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); -int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, - int numOfColsIds, bool mergeBitmap); -int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); -int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); +int tsdbInitReadH(SReadH *pReadh, STsdb *pRepo); +void tsdbDestroyReadH(SReadH *pReadh); +int tsdbSetAndOpenReadFSet(SReadH *pReadh, SDFileSet *pSet); +void tsdbCloseAndUnsetFSet(SReadH *pReadh); +int tsdbLoadBlockIdx(SReadH *pReadh); +int tsdbSetReadTable(SReadH *pReadh, STable *pTable); +int tsdbLoadBlockInfo(SReadH *pReadh, void *pTarget); +int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlockInfo); +int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo, const int16_t *colIds, int numOfColsIds, + bool mergeBitmap); +int tsdbLoadBlockStatis(SReadH *pReadh, SBlock *pBlock); +int tsdbEncodeSBlockIdx(void **buf, SBlockIdx *pIdx); void *tsdbDecodeSBlockIdx(void *buf, SBlockIdx *pIdx); void tsdbGetBlockStatis(SReadH *pReadh, SDataStatis *pStatis, int numOfCols, SBlock *pBlock); @@ -448,7 +473,7 @@ static FORCE_INLINE void *taosTMalloc(size_t size) { static FORCE_INLINE void *taosTCalloc(size_t nmemb, size_t size) { size_t tsize = nmemb * size; - void * ret = taosTMalloc(tsize); + void *ret = taosTMalloc(tsize); if (ret == NULL) return NULL; taosTMemset(ret, 0); @@ -459,14 +484,14 @@ static FORCE_INLINE size_t taosTSizeof(void *ptr) { return (ptr) ? (*(size_t *)( static FORCE_INLINE void taosTMemset(void *ptr, int c) { memset(ptr, c, taosTSizeof(ptr)); } -static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) { +static FORCE_INLINE void *taosTRealloc(void *ptr, size_t size) { if (ptr == NULL) return taosTMalloc(size); if (size <= taosTSizeof(ptr)) return ptr; - void * tptr = (void *)((char *)ptr - sizeof(size_t)); + void *tptr = (void *)((char *)ptr - sizeof(size_t)); size_t tsize = size + sizeof(size_t); - void* tptr1 = taosMemoryRealloc(tptr, tsize); + void *tptr1 = taosMemoryRealloc(tptr, tsize); if (tptr1 == NULL) return NULL; tptr = tptr1; @@ -475,9 +500,9 @@ static FORCE_INLINE void * taosTRealloc(void *ptr, size_t size) { return (void *)((char *)tptr + sizeof(size_t)); } -static FORCE_INLINE void* taosTZfree(void* ptr) { +static FORCE_INLINE void *taosTZfree(void *ptr) { if (ptr) { - taosMemoryFree((void*)((char*)ptr - sizeof(size_t))); + taosMemoryFree((void *)((char *)ptr - sizeof(size_t))); } return NULL; } @@ -530,30 +555,30 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) { // void* tsdbGetSmaDataByKey(SDBFile* pDBF, void* key, uint32_t keySize, uint32_t* valueSize); // tsdbFile -#define TSDB_FILE_HEAD_SIZE 512 -#define TSDB_FILE_DELIMITER 0xF00AFA0F +#define TSDB_FILE_HEAD_SIZE 512 +#define TSDB_FILE_DELIMITER 0xF00AFA0F #define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF -#define TSDB_IVLD_FID INT_MIN -#define TSDB_FILE_STATE_OK 0 -#define TSDB_FILE_STATE_BAD 1 +#define TSDB_IVLD_FID INT_MIN +#define TSDB_FILE_STATE_OK 0 +#define TSDB_FILE_STATE_BAD 1 -#define TSDB_FILE_INFO(tf) (&((tf)->info)) -#define TSDB_FILE_F(tf) (&((tf)->f)) -#define TSDB_FILE_PFILE(tf) ((tf)->pFile) -#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname) -#define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL) -#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf)) -#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL) -#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level) -#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id) -#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did) -#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname) -#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname) -#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf)) -#define TSDB_FILE_STATE(tf) ((tf)->state) +#define TSDB_FILE_INFO(tf) (&((tf)->info)) +#define TSDB_FILE_F(tf) (&((tf)->f)) +#define TSDB_FILE_PFILE(tf) ((tf)->pFile) +#define TSDB_FILE_FULL_NAME(tf) (TSDB_FILE_F(tf)->aname) +#define TSDB_FILE_OPENED(tf) (TSDB_FILE_PFILE(tf) != NULL) +#define TSDB_FILE_CLOSED(tf) (!TSDB_FILE_OPENED(tf)) +#define TSDB_FILE_SET_CLOSED(f) (TSDB_FILE_PFILE(f) = NULL) +#define TSDB_FILE_LEVEL(tf) (TSDB_FILE_F(tf)->did.level) +#define TSDB_FILE_ID(tf) (TSDB_FILE_F(tf)->did.id) +#define TSDB_FILE_DID(tf) (TSDB_FILE_F(tf)->did) +#define TSDB_FILE_REL_NAME(tf) (TSDB_FILE_F(tf)->rname) +#define TSDB_FILE_ABS_NAME(tf) (TSDB_FILE_F(tf)->aname) +#define TSDB_FILE_FSYNC(tf) taosFsyncFile(TSDB_FILE_PFILE(tf)) +#define TSDB_FILE_STATE(tf) ((tf)->state) #define TSDB_FILE_SET_STATE(tf, s) ((tf)->state = (s)) -#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) -#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) +#define TSDB_FILE_IS_OK(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_OK) +#define TSDB_FILE_IS_BAD(tf) (TSDB_FILE_STATE(tf) == TSDB_FILE_STATE_BAD) typedef int32_t TSDB_FILE_T; typedef enum { @@ -576,19 +601,18 @@ static FORCE_INLINE uint32_t tsdbGetDFSVersion(TSDB_FILE_T fType) { // latest v } } +void tsdbInitDFile(STsdb *pRepo, SDFile *pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype); +void tsdbInitDFileEx(SDFile *pDFile, SDFile *pODFile); +int tsdbEncodeSDFile(void **buf, SDFile *pDFile); +void *tsdbDecodeSDFile(STsdb *pRepo, void *buf, SDFile *pDFile); +int tsdbCreateDFile(STsdb *pRepo, SDFile *pDFile, bool updateHeader, TSDB_FILE_T fType); +int tsdbUpdateDFileHeader(SDFile *pDFile); +int tsdbLoadDFileHeader(SDFile *pDFile, SDFInfo *pInfo); +int tsdbParseDFilename(const char *fname, int *vid, int *fid, TSDB_FILE_T *ftype, uint32_t *version); -void tsdbInitDFile(STsdb *pRepo, SDFile* pDFile, SDiskID did, int fid, uint32_t ver, TSDB_FILE_T ftype); -void tsdbInitDFileEx(SDFile* pDFile, SDFile* pODFile); -int tsdbEncodeSDFile(void** buf, SDFile* pDFile); -void* tsdbDecodeSDFile(STsdb *pRepo, void* buf, SDFile* pDFile); -int tsdbCreateDFile(STsdb *pRepo, SDFile* pDFile, bool updateHeader, TSDB_FILE_T fType); -int tsdbUpdateDFileHeader(SDFile* pDFile); -int tsdbLoadDFileHeader(SDFile* pDFile, SDFInfo* pInfo); -int tsdbParseDFilename(const char* fname, int* vid, int* fid, TSDB_FILE_T* ftype, uint32_t* version); +static FORCE_INLINE void tsdbSetDFileInfo(SDFile *pDFile, SDFInfo *pInfo) { pDFile->info = *pInfo; } -static FORCE_INLINE void tsdbSetDFileInfo(SDFile* pDFile, SDFInfo* pInfo) { pDFile->info = *pInfo; } - -static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) { +static FORCE_INLINE int tsdbOpenDFile(SDFile *pDFile, int flags) { ASSERT(!TSDB_FILE_OPENED(pDFile)); pDFile->pFile = taosOpenFile(TSDB_FILE_FULL_NAME(pDFile), flags); @@ -600,14 +624,14 @@ static FORCE_INLINE int tsdbOpenDFile(SDFile* pDFile, int flags) { return 0; } -static FORCE_INLINE void tsdbCloseDFile(SDFile* pDFile) { +static FORCE_INLINE void tsdbCloseDFile(SDFile *pDFile) { if (TSDB_FILE_OPENED(pDFile)) { taosCloseFile(&pDFile->pFile); TSDB_FILE_SET_CLOSED(pDFile); } } -static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int whence) { +static FORCE_INLINE int64_t tsdbSeekDFile(SDFile *pDFile, int64_t offset, int whence) { // ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t loffset = taosLSeekFile(TSDB_FILE_PFILE(pDFile), offset, whence); @@ -619,7 +643,7 @@ static FORCE_INLINE int64_t tsdbSeekDFile(SDFile* pDFile, int64_t offset, int wh return loffset; } -static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nbyte) { +static FORCE_INLINE int64_t tsdbWriteDFile(SDFile *pDFile, void *buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t nwrite = taosWriteFile(pDFile->pFile, buf, nbyte); @@ -631,11 +655,11 @@ static FORCE_INLINE int64_t tsdbWriteDFile(SDFile* pDFile, void* buf, int64_t nb return nwrite; } -static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile* pDFile, void* pCksm) { - pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t*)(pCksm), sizeof(TSCKSUM)); +static FORCE_INLINE void tsdbUpdateDFileMagic(SDFile *pDFile, void *pCksm) { + pDFile->info.magic = taosCalcChecksum(pDFile->info.magic, (uint8_t *)(pCksm), sizeof(TSCKSUM)); } -static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte, int64_t* offset) { +static FORCE_INLINE int tsdbAppendDFile(SDFile *pDFile, void *buf, int64_t nbyte, int64_t *offset) { ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t toffset; @@ -659,9 +683,9 @@ static FORCE_INLINE int tsdbAppendDFile(SDFile* pDFile, void* buf, int64_t nbyte return (int)nbyte; } -static FORCE_INLINE int tsdbRemoveDFile(SDFile* pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); } +static FORCE_INLINE int tsdbRemoveDFile(SDFile *pDFile) { return tfsRemoveFile(TSDB_FILE_F(pDFile)); } -static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nbyte) { +static FORCE_INLINE int64_t tsdbReadDFile(SDFile *pDFile, void *buf, int64_t nbyte) { ASSERT(TSDB_FILE_OPENED(pDFile)); int64_t nread = taosReadFile(pDFile->pFile, buf, nbyte); @@ -673,7 +697,7 @@ static FORCE_INLINE int64_t tsdbReadDFile(SDFile* pDFile, void* buf, int64_t nby return nread; } -static FORCE_INLINE int tsdbCopyDFile(SDFile* pSrc, SDFile* pDest) { +static FORCE_INLINE int tsdbCopyDFile(SDFile *pSrc, SDFile *pDest) { if (tfsCopyFile(TSDB_FILE_F(pSrc), TSDB_FILE_F(pDest)) < 0) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; @@ -700,12 +724,12 @@ typedef struct { #define TSDB_LATEST_FSET_VER 0 -#define TSDB_FSET_FID(s) ((s)->fid) -#define TSDB_FSET_STATE(s) ((s)->state) -#define TSDB_FSET_VER(s) ((s)->ver) +#define TSDB_FSET_FID(s) ((s)->fid) +#define TSDB_FSET_STATE(s) ((s)->state) +#define TSDB_FSET_VER(s) ((s)->ver) #define TSDB_DFILE_IN_SET(s, t) ((s)->files + (t)) -#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) -#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) +#define TSDB_FSET_LEVEL(s) TSDB_FILE_LEVEL(TSDB_DFILE_IN_SET(s, 0)) +#define TSDB_FSET_ID(s) TSDB_FILE_ID(TSDB_DFILE_IN_SET(s, 0)) #define TSDB_FSET_SET_CLOSED(s) \ do { \ for (TSDB_FILE_T ftype = TSDB_FILE_HEAD; ftype < TSDB_FILE_MAX; ftype++) { \ @@ -719,24 +743,24 @@ typedef struct { } \ } while (0); -void tsdbInitDFileSet(STsdb *pRepo, SDFileSet* pSet, SDiskID did, int fid, uint32_t ver); -void tsdbInitDFileSetEx(SDFileSet* pSet, SDFileSet* pOSet); -int tsdbEncodeDFileSet(void** buf, SDFileSet* pSet); -void* tsdbDecodeDFileSet(STsdb *pRepo, void* buf, SDFileSet* pSet); -int tsdbEncodeDFileSetEx(void** buf, SDFileSet* pSet); -void* tsdbDecodeDFileSetEx(void* buf, SDFileSet* pSet); -int tsdbApplyDFileSetChange(SDFileSet* from, SDFileSet* to); -int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet* pSet, bool updateHeader); -int tsdbUpdateDFileSetHeader(SDFileSet* pSet); -int tsdbScanAndTryFixDFileSet(STsdb* pRepo, SDFileSet* pSet); +void tsdbInitDFileSet(STsdb *pRepo, SDFileSet *pSet, SDiskID did, int fid, uint32_t ver); +void tsdbInitDFileSetEx(SDFileSet *pSet, SDFileSet *pOSet); +int tsdbEncodeDFileSet(void **buf, SDFileSet *pSet); +void *tsdbDecodeDFileSet(STsdb *pRepo, void *buf, SDFileSet *pSet); +int tsdbEncodeDFileSetEx(void **buf, SDFileSet *pSet); +void *tsdbDecodeDFileSetEx(void *buf, SDFileSet *pSet); +int tsdbApplyDFileSetChange(SDFileSet *from, SDFileSet *to); +int tsdbCreateDFileSet(STsdb *pRepo, SDFileSet *pSet, bool updateHeader); +int tsdbUpdateDFileSetHeader(SDFileSet *pSet); +int tsdbScanAndTryFixDFileSet(STsdb *pRepo, SDFileSet *pSet); -static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet* pSet) { +static FORCE_INLINE void tsdbCloseDFileSet(SDFileSet *pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { tsdbCloseDFile(TSDB_DFILE_IN_SET(pSet, ftype)); } } -static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) { +static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet *pSet, int flags) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { if (tsdbOpenDFile(TSDB_DFILE_IN_SET(pSet, ftype), flags) < 0) { tsdbCloseDFileSet(pSet); @@ -746,13 +770,13 @@ static FORCE_INLINE int tsdbOpenDFileSet(SDFileSet* pSet, int flags) { return 0; } -static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet* pSet) { +static FORCE_INLINE void tsdbRemoveDFileSet(SDFileSet *pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { (void)tsdbRemoveDFile(TSDB_DFILE_IN_SET(pSet, ftype)); } } -static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) { +static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet *pSrc, SDFileSet *pDest) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { if (tsdbCopyDFile(TSDB_DFILE_IN_SET(pSrc, ftype), TSDB_DFILE_IN_SET(pDest, ftype)) < 0) { tsdbRemoveDFileSet(pDest); @@ -763,12 +787,12 @@ static FORCE_INLINE int tsdbCopyDFileSet(SDFileSet* pSrc, SDFileSet* pDest) { return 0; } -static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY* minKey, TSKEY* maxKey) { +static FORCE_INLINE void tsdbGetFidKeyRange(int days, int8_t precision, int fid, TSKEY *minKey, TSKEY *maxKey) { *minKey = fid * days * tsTickPerDay[precision]; *maxKey = *minKey + days * tsTickPerDay[precision] - 1; } -static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) { +static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet *pSet) { for (TSDB_FILE_T ftype = 0; ftype < TSDB_FILE_MAX; ftype++) { if (TSDB_FILE_IS_BAD(TSDB_DFILE_IN_SET(pSet, ftype))) { return false; @@ -809,25 +833,25 @@ typedef struct { */ #define FS_CURRENT_STATUS(pfs) ((pfs)->cstatus) -#define FS_NEW_STATUS(pfs) ((pfs)->nstatus) -#define FS_IN_TXN(pfs) (pfs)->intxn -#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version) -#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version) +#define FS_NEW_STATUS(pfs) ((pfs)->nstatus) +#define FS_IN_TXN(pfs) (pfs)->intxn +#define FS_VERSION(pfs) ((pfs)->cstatus->meta.version) +#define FS_TXN_VERSION(pfs) ((pfs)->nstatus->meta.version) typedef struct { int direction; uint64_t version; // current FS version - STsdbFS * pfs; + STsdbFS *pfs; int index; // used to position next fset when version the same int fid; // used to seek when version is changed SDFileSet *pSet; } SFSIter; -#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC +#define TSDB_FS_ITER_FORWARD TSDB_ORDER_ASC #define TSDB_FS_ITER_BACKWARD TSDB_ORDER_DESC STsdbFS *tsdbNewFS(const STsdbCfg *pCfg); -void * tsdbFreeFS(STsdbFS *pfs); +void *tsdbFreeFS(STsdbFS *pfs); int tsdbOpenFS(STsdb *pRepo); void tsdbCloseFS(STsdb *pRepo); void tsdbStartFSTxn(STsdb *pRepo, int64_t pointsAdd, int64_t storageAdd); @@ -872,7 +896,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { // tsdbSma // #define TSDB_SMA_TEST // remove after test finished - // struct SSmaEnv { // TdThreadRwlock lock; // SDiskID did; @@ -888,7 +911,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { // #define SMA_ENV_STAT(env) ((env)->pStat) // #define SMA_ENV_STAT_ITEMS(env) ((env)->pStat->smaStatItems) - // void tsdbDestroySmaEnv(SSmaEnv *pSmaEnv); // void *tsdbFreeSmaEnv(SSmaEnv *pSmaEnv); // #if 0 @@ -935,5 +957,4 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) { } #endif - #endif /*_TD_VNODE_TSDB_H_*/ \ No newline at end of file From 6dab6153a8bbfa0e8d20061d599a1af7b118c17f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 05:28:11 +0000 Subject: [PATCH 2/8] refact vnode --- source/dnode/vnode/inc/vnode.h | 3 --- source/dnode/vnode/src/inc/tsdb.h | 1 + source/dnode/vnode/src/inc/vnd.h | 37 +++++++++++++++++++++++++ source/dnode/vnode/src/inc/vnodeInt.h | 39 ++------------------------- 4 files changed, 40 insertions(+), 40 deletions(-) create mode 100644 source/dnode/vnode/src/inc/vnd.h diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index c7a8a2a83d..7ed0092d36 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -113,9 +113,6 @@ int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockIn SArray *tqRetrieveDataBlock(STqReadHandle *pHandle); // need to reposition -typedef struct SMgmtWrapper SMgmtWrapper; - -int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); // structs struct SMetaCfg { diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 63c0ad6cdd..6fbf265d2c 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -58,6 +58,7 @@ int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); +int32_t tdScanAndConvertSubmitMsg(SSubmitReq *pMsg); typedef enum { TSDB_FILE_HEAD = 0, // .head diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h new file mode 100644 index 0000000000..cf62962030 --- /dev/null +++ b/source/dnode/vnode/src/inc/vnd.h @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#ifndef _TD_VND_H_ +#define _TD_VND_H_ + +#ifdef __cplusplus +extern "C" { +#endif + +// vndDebug ==================== +// clang-format off +#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) +#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) +#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0) +#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0) +#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0) +#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) +// clang-format on + +#ifdef __cplusplus +} +#endif + +#endif /*_TD_VND_H_*/ \ No newline at end of file diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 7168493666..a3c7f73094 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -110,43 +110,6 @@ int vnodeScheduleTask(SVnodeTask* task); int vnodeQueryOpen(SVnode* pVnode); void vnodeQueryClose(SVnode* pVnode); -#define vFatal(...) \ - do { \ - if (vDebugFlag & DEBUG_FATAL) { \ - taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \ - } \ - } while (0) -#define vError(...) \ - do { \ - if (vDebugFlag & DEBUG_ERROR) { \ - taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \ - } \ - } while (0) -#define vWarn(...) \ - do { \ - if (vDebugFlag & DEBUG_WARN) { \ - taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); \ - } \ - } while (0) -#define vInfo(...) \ - do { \ - if (vDebugFlag & DEBUG_INFO) { \ - taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); \ - } \ - } while (0) -#define vDebug(...) \ - do { \ - if (vDebugFlag & DEBUG_DEBUG) { \ - taosPrintLog("VND ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); \ - } \ - } while (0) -#define vTrace(...) \ - do { \ - if (vDebugFlag & DEBUG_TRACE) { \ - taosPrintLog("VND ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); \ - } \ - } while (0) - // vnodeCfg.h extern const SVnodeCfg defaultVnodeOptions; @@ -218,6 +181,8 @@ int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t wo // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); +#include "vnd.h" + #include "meta.h" #include "tsdb.h" From 593adbd50950f7772924e51e0842a77f086c4f28 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 05:53:45 +0000 Subject: [PATCH 3/8] refact vnode --- source/dnode/vnode/CMakeLists.txt | 3 +- source/dnode/vnode/src/inc/tq.h | 20 +++ source/dnode/vnode/src/inc/vnodeInt.h | 24 +--- source/dnode/vnode/src/vnd/vnodeModule.c | 158 +++++++++++++++++++++++ 4 files changed, 182 insertions(+), 23 deletions(-) create mode 100644 source/dnode/vnode/src/vnd/vnodeModule.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index 4b8aa5dfaf..ba628a808d 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -10,10 +10,11 @@ target_sources( "src/vnd/vnodeCommit.c" "src/vnd/vnodeInt.c" "src/vnd/vnodeMain.c" - "src/vnd/vnodeMgr.c" "src/vnd/vnodeQuery.c" "src/vnd/vnodeStateMgr.c" "src/vnd/vnodeWrite.c" + # "src/vnd/vnodeModule.c" + "src/vnd/vnodeMgr.c" # meta # "src/meta/metaBDBImpl.c" diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 78e1b1ce03..ce81006661 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -246,6 +246,26 @@ typedef struct { static STqPushMgmt tqPushMgmt; +// init once +int tqInit(); +void tqCleanUp(); + +// open in each vnode +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, + SMemAllocatorFactory* allocFac); +void tqClose(STQ*); +// required by vnode +int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); +int tqCommit(STQ*); + +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); +int32_t tqProcessSetConnReq(STQ* pTq, char* msg); +int32_t tqProcessRebReq(STQ* pTq, char* msg); +int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); + int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index a3c7f73094..6fbf4fb1df 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -35,9 +35,10 @@ #include "tstream.h" #include "ttime.h" #include "ttimer.h" -#include "vnode.h" #include "wal.h" +#include "vnode.h" + #ifdef __cplusplus extern "C" { #endif @@ -157,27 +158,6 @@ void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); void vmaFree(SVMemAllocator* pVMA, void* ptr); bool vmaIsFull(SVMemAllocator* pVMA); -// init once -int tqInit(); -void tqCleanUp(); - -// open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal, SMeta* pMeta, STqCfg* tqConfig, - SMemAllocatorFactory* allocFac); -void tqClose(STQ*); - -// required by vnode -int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t version); -int tqCommit(STQ*); - -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); -int32_t tqProcessSetConnReq(STQ* pTq, char* msg); -int32_t tqProcessRebReq(STQ* pTq, char* msg); -int32_t tqProcessCancelConnReq(STQ* pTq, char* msg); -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); - // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c new file mode 100644 index 0000000000..2b5b46a45d --- /dev/null +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -0,0 +1,158 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "vnodeInt.h" + +typedef struct SVnodeTask SVnodeTask; +struct SVnodeTask { + SVnodeTask* next; + SVnodeTask* prev; + int (*execute)(void*); + void* arg; +}; + +struct SVnodeGlobal { + int8_t init; + int8_t stop; + int nthreads; + TdThread* threads; + TdThreadMutex mutex; + TdThreadCond hasTask; + SVnodeTask queue; +}; + +struct SVnodeGlobal vnodeGlobal; + +static void* loop(void* arg); + +int vnodeInit(int nthreads) { + int8_t init; + int ret; + + init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 0, 1); + if (init) { + return 0; + } + + vnodeGlobal.stop = 0; + + vnodeGlobal.queue.next = &vnodeGlobal.queue; + vnodeGlobal.queue.prev = &vnodeGlobal.queue; + + vnodeGlobal.nthreads = nthreads; + vnodeGlobal.threads = taosMemoryCalloc(nthreads, sizeof(TdThread)); + if (vnodeGlobal.threads == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + vError("failed to init vnode module since: %s", tstrerror(terrno)); + return -1; + } + + taosThreadMutexInit(&vnodeGlobal.mutex, NULL); + taosThreadCondInit(&vnodeGlobal.hasTask, NULL); + + for (int i = 0; i < nthreads; i++) { + taosThreadCreate(&(vnodeGlobal.threads[i]), NULL, loop, NULL); + } + + if (walInit() < 0) { + return -1; + } + + return 0; +} + +void vnodeCleanup() { + int8_t init; + + init = atomic_val_compare_exchange_8(&(vnodeGlobal.init), 1, 0); + if (init == 0) return; + + // set stop + taosThreadMutexLock(&(vnodeGlobal.mutex)); + vnodeGlobal.stop = 1; + taosThreadCondBroadcast(&(vnodeGlobal.hasTask)); + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + + // wait for threads + for (int i = 0; i < vnodeGlobal.nthreads; i++) { + taosThreadJoin(vnodeGlobal.threads[i], NULL); + } + + // clear source + taosMemoryFreeClear(vnodeGlobal.threads); + taosThreadCondDestroy(&(vnodeGlobal.hasTask)); + taosThreadMutexDestroy(&(vnodeGlobal.mutex)); +} + +int vnodeScheduleTask(int (*execute)(void*), void* arg) { + SVnodeTask* pTask; + + ASSERT(!vnodeGlobal.stop); + + pTask = taosMemoryMalloc(sizeof(*pTask)); + if (pTask == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return -1; + } + + pTask->execute = execute; + pTask->arg = arg; + + taosThreadMutexLock(&(vnodeGlobal.mutex)); + pTask->next = &vnodeGlobal.queue; + pTask->prev = vnodeGlobal.queue.prev; + vnodeGlobal.queue.prev->next = pTask; + vnodeGlobal.queue.prev = pTask; + taosThreadCondSignal(&(vnodeGlobal.hasTask)); + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + + return 0; +} + +/* ------------------------ STATIC METHODS ------------------------ */ +static void* loop(void* arg) { + SVnodeTask* pTask; + int ret; + + setThreadName("vnode-commit"); + + for (;;) { + taosThreadMutexLock(&(vnodeGlobal.mutex)); + for (;;) { + pTask = vnodeGlobal.queue.next; + if (pTask == &vnodeGlobal.queue) { + // no task + if (vnodeGlobal.stop) { + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + return NULL; + } else { + taosThreadCondWait(&(vnodeGlobal.hasTask), &(vnodeGlobal.mutex)); + } + } else { + // has task + pTask->prev->next = pTask->next; + pTask->next->prev = pTask->prev; + break; + } + } + + taosThreadMutexUnlock(&(vnodeGlobal.mutex)); + + pTask->execute(pTask->arg); + taosMemoryFree(pTask); + } + + return NULL; +} \ No newline at end of file From 249de584e6863b25c7bd52144897f9197995db64 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 06:16:43 +0000 Subject: [PATCH 4/8] more refact --- source/dnode/vnode/CMakeLists.txt | 4 +- source/dnode/vnode/src/inc/vnd.h | 57 +++++++++++++++++- source/dnode/vnode/src/inc/vnodeInt.h | 75 +----------------------- source/dnode/vnode/src/vnd/vnodeCommit.c | 10 +--- 4 files changed, 63 insertions(+), 83 deletions(-) diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index ba628a808d..ad9153cf83 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -13,8 +13,8 @@ target_sources( "src/vnd/vnodeQuery.c" "src/vnd/vnodeStateMgr.c" "src/vnd/vnodeWrite.c" - # "src/vnd/vnodeModule.c" - "src/vnd/vnodeMgr.c" + "src/vnd/vnodeModule.c" + # "src/vnd/vnodeMgr.c" # meta # "src/meta/metaBDBImpl.c" diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index cf62962030..6957cfdb73 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -20,7 +20,7 @@ extern "C" { #endif -// vndDebug ==================== +// vnodeDebug ==================== // clang-format off #define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0) #define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0) @@ -30,6 +30,61 @@ extern "C" { #define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0) // clang-format on +// vnodeModule ==================== +int vnodeScheduleTask(int (*execute)(void*), void* arg); + +// vnodeQuery ==================== +int vnodeQueryOpen(SVnode* pVnode); +void vnodeQueryClose(SVnode* pVnode); + +#if 1 +// SVBufPool +int vnodeOpenBufPool(SVnode* pVnode); +void vnodeCloseBufPool(SVnode* pVnode); +int vnodeBufPoolSwitch(SVnode* pVnode); +int vnodeBufPoolRecycle(SVnode* pVnode); +void* vnodeMalloc(SVnode* pVnode, uint64_t size); +bool vnodeBufPoolIsFull(SVnode* pVnode); + +SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode); + +// SVMemAllocator +typedef struct SVArenaNode { + TD_SLIST_NODE(SVArenaNode); + uint64_t size; // current node size + void* ptr; + char data[]; +} SVArenaNode; + +typedef struct SVMemAllocator { + T_REF_DECLARE() + TD_DLIST_NODE(SVMemAllocator); + uint64_t capacity; + uint64_t ssize; + uint64_t lsize; + SVArenaNode* pNode; + TD_SLIST(SVArenaNode) nlist; +} SVMemAllocator; + +SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize); +void vmaDestroy(SVMemAllocator* pVMA); +void vmaReset(SVMemAllocator* pVMA); +void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); +void vmaFree(SVMemAllocator* pVMA, void* ptr); +bool vmaIsFull(SVMemAllocator* pVMA); + +// vnodeCfg.h +extern const SVnodeCfg defaultVnodeOptions; + +int vnodeValidateOptions(const SVnodeCfg*); +void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc); + +// For commit +#define vnodeShouldCommit vnodeBufPoolIsFull +int vnodeSyncCommit(SVnode* pVnode); +int vnodeAsyncCommit(SVnode* pVnode); +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 6fbf4fb1df..c956184fc9 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -43,29 +43,13 @@ extern "C" { #endif -typedef struct STQ STQ; - +typedef struct SMeta SMeta; +typedef struct STsdb STsdb; +typedef struct STQ STQ; typedef struct SVState SVState; typedef struct SVBufPool SVBufPool; typedef struct SQWorkerMgmt SQHandle; -typedef struct SVnodeTask { - TD_DLIST_NODE(SVnodeTask); - void* arg; - int (*execute)(void*); -} SVnodeTask; - -typedef struct SVnodeMgr { - td_mode_flag_t vnodeInitFlag; - // For commit - bool stop; - uint16_t nthreads; - TdThread* threads; - TdThreadMutex mutex; - TdThreadCond hasTask; - TD_DLIST(SVnodeTask) queue; -} SVnodeMgr; - typedef struct { int8_t streamType; // sma or other int8_t dstType; @@ -81,8 +65,6 @@ typedef struct { SHashObj* pHash; // streamId -> SStreamSinkInfo } SSink; -extern SVnodeMgr vnodeMgr; - // SVState struct SVState { int64_t processed; @@ -107,57 +89,6 @@ struct SVnode { STfs* pTfs; }; -int vnodeScheduleTask(SVnodeTask* task); -int vnodeQueryOpen(SVnode* pVnode); -void vnodeQueryClose(SVnode* pVnode); - -// vnodeCfg.h -extern const SVnodeCfg defaultVnodeOptions; - -int vnodeValidateOptions(const SVnodeCfg*); -void vnodeOptionsCopy(SVnodeCfg* pDest, const SVnodeCfg* pSrc); - -// For commit -#define vnodeShouldCommit vnodeBufPoolIsFull -int vnodeSyncCommit(SVnode* pVnode); -int vnodeAsyncCommit(SVnode* pVnode); - -// SVBufPool - -int vnodeOpenBufPool(SVnode* pVnode); -void vnodeCloseBufPool(SVnode* pVnode); -int vnodeBufPoolSwitch(SVnode* pVnode); -int vnodeBufPoolRecycle(SVnode* pVnode); -void* vnodeMalloc(SVnode* pVnode, uint64_t size); -bool vnodeBufPoolIsFull(SVnode* pVnode); - -SMemAllocatorFactory* vBufPoolGetMAF(SVnode* pVnode); - -// SVMemAllocator -typedef struct SVArenaNode { - TD_SLIST_NODE(SVArenaNode); - uint64_t size; // current node size - void* ptr; - char data[]; -} SVArenaNode; - -typedef struct SVMemAllocator { - T_REF_DECLARE() - TD_DLIST_NODE(SVMemAllocator); - uint64_t capacity; - uint64_t ssize; - uint64_t lsize; - SVArenaNode* pNode; - TD_SLIST(SVArenaNode) nlist; -} SVMemAllocator; - -SVMemAllocator* vmaCreate(uint64_t capacity, uint64_t ssize, uint64_t lsize); -void vmaDestroy(SVMemAllocator* pVMA); -void vmaReset(SVMemAllocator* pVMA); -void* vmaMalloc(SVMemAllocator* pVMA, uint64_t size); -void vmaFree(SVMemAllocator* pVMA, void* ptr); -bool vmaIsFull(SVMemAllocator* pVMA); - // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index 1235637e19..b4c3725a5e 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) { vnodeWaitCommit(pVnode); vnodeBufPoolSwitch(pVnode); - SVnodeTask *pTask = (SVnodeTask *)taosMemoryMalloc(sizeof(*pTask)); - - pTask->execute = vnodeCommit; // TODO - pTask->arg = pVnode; // TODO - tsdbPrepareCommit(pVnode->pTsdb); - // metaPrepareCommit(pVnode->pMeta); - // walPreapareCommit(pVnode->pWal); - vnodeScheduleTask(pTask); + vnodeScheduleTask(vnodeCommit, pVnode); + return 0; } From a6c05e4a1b70552aef6155dd0301205cc0462181 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 07:05:57 +0000 Subject: [PATCH 5/8] refact vnode --- source/dnode/mgmt/vm/vmInt.c | 2 +- source/dnode/vnode/src/vnd/vnodeMgr.c | 113 -------------------------- 2 files changed, 1 insertion(+), 114 deletions(-) delete mode 100644 source/dnode/vnode/src/vnd/vnodeMgr.c diff --git a/source/dnode/mgmt/vm/vmInt.c b/source/dnode/mgmt/vm/vmInt.c index 6a1a5c3987..e9a375688c 100644 --- a/source/dnode/mgmt/vm/vmInt.c +++ b/source/dnode/mgmt/vm/vmInt.c @@ -299,7 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) { goto _OVER; } - if (vnodeInit() != 0) { + if (vnodeInit(tsNumOfCommitThreads) != 0) { dError("failed to init vnode since %s", terrstr()); goto _OVER; } diff --git a/source/dnode/vnode/src/vnd/vnodeMgr.c b/source/dnode/vnode/src/vnd/vnodeMgr.c deleted file mode 100644 index df5e2ceffa..0000000000 --- a/source/dnode/vnode/src/vnd/vnodeMgr.c +++ /dev/null @@ -1,113 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" - -SVnodeMgr vnodeMgr = {.vnodeInitFlag = TD_MOD_UNINITIALIZED}; - -static void* loop(void* arg); - -int vnodeInit() { - if (TD_CHECK_AND_SET_MODE_INIT(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_INITIALIZED) { - return 0; - } - - vnodeMgr.stop = false; - - // Start commit handers - vnodeMgr.nthreads = tsNumOfCommitThreads; - vnodeMgr.threads = taosMemoryCalloc(vnodeMgr.nthreads, sizeof(TdThread)); - if (vnodeMgr.threads == NULL) { - return -1; - } - - taosThreadMutexInit(&(vnodeMgr.mutex), NULL); - taosThreadCondInit(&(vnodeMgr.hasTask), NULL); - TD_DLIST_INIT(&(vnodeMgr.queue)); - - for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) { - taosThreadCreate(&(vnodeMgr.threads[i]), NULL, loop, NULL); - // pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread"); - } - - if (walInit() < 0) { - return -1; - } - - return 0; -} - -void vnodeCleanup() { - if (TD_CHECK_AND_SET_MOD_CLEAR(&(vnodeMgr.vnodeInitFlag)) == TD_MOD_UNINITIALIZED) { - return; - } - - // Stop commit handler - taosThreadMutexLock(&(vnodeMgr.mutex)); - vnodeMgr.stop = true; - taosThreadCondBroadcast(&(vnodeMgr.hasTask)); - taosThreadMutexUnlock(&(vnodeMgr.mutex)); - - for (uint16_t i = 0; i < vnodeMgr.nthreads; i++) { - taosThreadJoin(vnodeMgr.threads[i], NULL); - } - - taosMemoryFreeClear(vnodeMgr.threads); - taosThreadCondDestroy(&(vnodeMgr.hasTask)); - taosThreadMutexDestroy(&(vnodeMgr.mutex)); -} - -int vnodeScheduleTask(SVnodeTask* pTask) { - taosThreadMutexLock(&(vnodeMgr.mutex)); - - TD_DLIST_APPEND(&(vnodeMgr.queue), pTask); - - taosThreadCondSignal(&(vnodeMgr.hasTask)); - - taosThreadMutexUnlock(&(vnodeMgr.mutex)); - - return 0; -} - -/* ------------------------ STATIC METHODS ------------------------ */ -static void* loop(void* arg) { - setThreadName("vnode-commit"); - - SVnodeTask* pTask; - for (;;) { - taosThreadMutexLock(&(vnodeMgr.mutex)); - for (;;) { - pTask = TD_DLIST_HEAD(&(vnodeMgr.queue)); - if (pTask == NULL) { - if (vnodeMgr.stop) { - taosThreadMutexUnlock(&(vnodeMgr.mutex)); - return NULL; - } else { - taosThreadCondWait(&(vnodeMgr.hasTask), &(vnodeMgr.mutex)); - } - } else { - TD_DLIST_POP(&(vnodeMgr.queue), pTask); - break; - } - } - - taosThreadMutexUnlock(&(vnodeMgr.mutex)); - - (*(pTask->execute))(pTask->arg); - taosMemoryFree(pTask); - } - - return NULL; -} \ No newline at end of file From 12e92a80abdaf5511d50261f3d4e3e9cc747afb2 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 07:23:18 +0000 Subject: [PATCH 6/8] add tmux --- .devcontainer/Dockerfile | 2 +- .devcontainer/devcontainer.json | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/.devcontainer/Dockerfile b/.devcontainer/Dockerfile index 01172ae9c9..a23ebdecfd 100644 --- a/.devcontainer/Dockerfile +++ b/.devcontainer/Dockerfile @@ -7,4 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT} # [Optional] Uncomment this section to install additional packages. # RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \ # && apt-get -y install --no-install-recommends -RUN apt-get update && apt-get -y install tree vim +RUN apt-get update && apt-get -y install tree vim tmux diff --git a/.devcontainer/devcontainer.json b/.devcontainer/devcontainer.json index cb05c84d72..f9ad25e275 100644 --- a/.devcontainer/devcontainer.json +++ b/.devcontainer/devcontainer.json @@ -32,7 +32,7 @@ // Use 'forwardPorts' to make a list of ports inside the container available locally. // "forwardPorts": [], // Use 'postCreateCommand' to run commands after the container is created. - // "postCreateCommand": "gcc -v", + "postCreateCommand": "wget https://raw.githubusercontent.com/hzcheng/config/master/.tmux.conf -P /root", // Comment out connect as root instead. More info: https://aka.ms/vscode-remote/containers/non-root. "remoteUser": "root" } \ No newline at end of file From 90fb44be8fcc397d525332e52638b978daea6a84 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 07:35:24 +0000 Subject: [PATCH 7/8] refact --- source/dnode/vnode/CMakeLists.txt | 1 - source/dnode/vnode/src/meta/metaCache.c | 40 ------------------------- source/dnode/vnode/src/meta/metaMain.c | 8 ----- 3 files changed, 49 deletions(-) delete mode 100644 source/dnode/vnode/src/meta/metaCache.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index ad9153cf83..e0f2447310 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -18,7 +18,6 @@ target_sources( # meta # "src/meta/metaBDBImpl.c" - "src/meta/metaCache.c" "src/meta/metaCfg.c" "src/meta/metaIdx.c" "src/meta/metaMain.c" diff --git a/source/dnode/vnode/src/meta/metaCache.c b/source/dnode/vnode/src/meta/metaCache.c deleted file mode 100644 index e1507a3757..0000000000 --- a/source/dnode/vnode/src/meta/metaCache.c +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" - -struct SMetaCache { - // TODO -}; - -int metaOpenCache(SMeta *pMeta) { - // TODO - // if (pMeta->options.lruSize) { - // pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize); - // if (pMeta->pCache == NULL) { - // // TODO: handle error - // return -1; - // } - // } - - return 0; -} - -void metaCloseCache(SMeta *pMeta) { - // if (pMeta->pCache) { - // rocksdb_cache_destroy(pMeta->pCache); - // pMeta->pCache = NULL; - // } -} \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaMain.c b/source/dnode/vnode/src/meta/metaMain.c index ac47c32cbf..74280c6fe8 100644 --- a/source/dnode/vnode/src/meta/metaMain.c +++ b/source/dnode/vnode/src/meta/metaMain.c @@ -94,13 +94,6 @@ static void metaFree(SMeta *pMeta) { } static int metaOpenImpl(SMeta *pMeta) { - // Open meta cache - if (metaOpenCache(pMeta) < 0) { - // TODO: handle error - metaCloseImpl(pMeta); - return -1; - } - // Open meta db if (metaOpenDB(pMeta) < 0) { // TODO: handle error @@ -129,5 +122,4 @@ static void metaCloseImpl(SMeta *pMeta) { metaCloseUidGnrt(pMeta); metaCloseIdx(pMeta); metaCloseDB(pMeta); - metaCloseCache(pMeta); } \ No newline at end of file From 224e6ccbcb2c1efa3c709a1567de8cc54303945f Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Thu, 14 Apr 2022 07:51:45 +0000 Subject: [PATCH 8/8] refact vnode --- source/dnode/vnode/CMakeLists.txt | 4 --- source/dnode/vnode/src/inc/meta.h | 5 --- source/dnode/vnode/src/meta/metaCfg.c | 29 --------------- source/dnode/vnode/src/meta/metaMain.c | 14 -------- source/dnode/vnode/src/meta/metaQuery.c | 16 --------- source/dnode/vnode/src/meta/metaTbCfg.c | 48 ------------------------- source/dnode/vnode/src/meta/metaTbTag.c | 16 --------- 7 files changed, 132 deletions(-) delete mode 100644 source/dnode/vnode/src/meta/metaCfg.c delete mode 100644 source/dnode/vnode/src/meta/metaQuery.c delete mode 100644 source/dnode/vnode/src/meta/metaTbCfg.c delete mode 100644 source/dnode/vnode/src/meta/metaTbTag.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index e0f2447310..f43f0b427e 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -18,13 +18,9 @@ target_sources( # meta # "src/meta/metaBDBImpl.c" - "src/meta/metaCfg.c" "src/meta/metaIdx.c" "src/meta/metaMain.c" - "src/meta/metaQuery.c" "src/meta/metaTable.c" - "src/meta/metaTbCfg.c" - "src/meta/metaTbTag.c" "src/meta/metaTbUid.c" "src/meta/metaTDBImpl.c" diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index e21ef5893b..d43503c1ca 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -75,11 +75,6 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid); int metaOpenCache(SMeta* pMeta); void metaCloseCache(SMeta* pMeta); -// SMetaCfg -extern const SMetaCfg defaultMetaOptions; -// int metaValidateOptions(const SMetaCfg*); -void metaOptionsCopy(SMetaCfg* pDest, const SMetaCfg* pSrc); - // SMetaIdx int metaOpenIdx(SMeta* pMeta); void metaCloseIdx(SMeta* pMeta); diff --git a/source/dnode/vnode/src/meta/metaCfg.c b/source/dnode/vnode/src/meta/metaCfg.c deleted file mode 100644 index 371c20f157..0000000000 --- a/source/dnode/vnode/src/meta/metaCfg.c +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" - -const SMetaCfg defaultMetaOptions = {.lruSize = 0}; - -/* ------------------------ EXPOSED METHODS ------------------------ */ - -int metaValidateOptions(const SMetaCfg *pMetaOptions) { - // TODO - return 0; -} - -void metaOptionsCopy(SMetaCfg *pDest, const SMetaCfg *pSrc) { memcpy(pDest, pSrc, sizeof(*pSrc)); } - -/* ------------------------ STATIC METHODS ------------------------ */ \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaMain.c b/source/dnode/vnode/src/meta/metaMain.c index 74280c6fe8..dd60e56371 100644 --- a/source/dnode/vnode/src/meta/metaMain.c +++ b/source/dnode/vnode/src/meta/metaMain.c @@ -25,17 +25,6 @@ static void metaCloseImpl(SMeta *pMeta); SMeta *metaOpen(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorFactory *pMAF) { SMeta *pMeta = NULL; - // Set default options - if (pMetaCfg == NULL) { - pMetaCfg = &defaultMetaOptions; - } - - // // Validate the options - // if (metaValidateOptions(pMetaCfg) < 0) { - // // TODO: deal with error - // return NULL; - // } - // Allocate handle pMeta = metaNew(path, pMetaCfg, pMAF); if (pMeta == NULL) { @@ -80,9 +69,6 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorF return NULL; } - metaOptionsCopy(&(pMeta->options), pMetaCfg); - pMeta->pmaf = pMAF; - return pMeta; }; diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c deleted file mode 100644 index 5022d0e050..0000000000 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaTbCfg.c b/source/dnode/vnode/src/meta/metaTbCfg.c deleted file mode 100644 index 9d5012c17f..0000000000 --- a/source/dnode/vnode/src/meta/metaTbCfg.c +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" - -int metaValidateTbCfg(SMeta *pMeta, const STbCfg *pTbOptions) { - // TODO - return 0; -} - -size_t metaEncodeTbObjFromTbOptions(const STbCfg *pTbOptions, void *pBuf, size_t bsize) { - void **ppBuf = &pBuf; - int tlen = 0; - - tlen += taosEncodeFixedU8(ppBuf, pTbOptions->type); - tlen += taosEncodeString(ppBuf, pTbOptions->name); - tlen += taosEncodeFixedU32(ppBuf, pTbOptions->ttl); - - switch (pTbOptions->type) { - case META_SUPER_TABLE: - tlen += taosEncodeFixedU64(ppBuf, pTbOptions->stbCfg.suid); - tlen += tdEncodeSchema(ppBuf, (STSchema *)pTbOptions->stbCfg.pTagSchema); - // TODO: encode schema version array - break; - case META_CHILD_TABLE: - tlen += taosEncodeFixedU64(ppBuf, pTbOptions->ctbCfg.suid); - break; - case META_NORMAL_TABLE: - // TODO: encode schema version array - break; - default: - break; - } - - return tlen; -} \ No newline at end of file diff --git a/source/dnode/vnode/src/meta/metaTbTag.c b/source/dnode/vnode/src/meta/metaTbTag.c deleted file mode 100644 index 5022d0e050..0000000000 --- a/source/dnode/vnode/src/meta/metaTbTag.c +++ /dev/null @@ -1,16 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" \ No newline at end of file