From 42c6f01980b8806fe24bbabfb697412d3d82ca22 Mon Sep 17 00:00:00 2001 From: Hongze Cheng Date: Tue, 26 Apr 2022 11:04:26 +0000 Subject: [PATCH] refact vnode includes --- source/dnode/vnode/CMakeLists.txt | 2 - source/dnode/vnode/inc/vnode.h | 8 +-- source/dnode/vnode/src/inc/meta.h | 34 ++--------- source/dnode/vnode/src/inc/tq.h | 12 +--- source/dnode/vnode/src/inc/tsdb.h | 16 +----- source/dnode/vnode/src/inc/tsdbSma.h | 10 +--- source/dnode/vnode/src/inc/vnd.h | 26 +++++++-- source/dnode/vnode/src/inc/vnodeInt.h | 67 ++++++++++++++++++---- source/dnode/vnode/src/inc/vnodeSync.h | 44 -------------- source/dnode/vnode/src/meta/metaCommit.c | 2 +- source/dnode/vnode/src/meta/metaEntry.c | 2 +- source/dnode/vnode/src/meta/metaIdx.c | 2 +- source/dnode/vnode/src/meta/metaOpen.c | 2 +- source/dnode/vnode/src/meta/metaQuery.c | 12 ++-- source/dnode/vnode/src/meta/metaTable.c | 6 +- source/dnode/vnode/src/tq/tq.c | 46 +++++++-------- source/dnode/vnode/src/tq/tqCommit.c | 2 +- source/dnode/vnode/src/tq/tqMetaStore.c | 4 +- source/dnode/vnode/src/tq/tqOffset.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCommit.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCommit2.c | 2 +- source/dnode/vnode/src/tsdb/tsdbCompact.c | 11 ++-- source/dnode/vnode/src/tsdb/tsdbFS.c | 2 +- source/dnode/vnode/src/tsdb/tsdbFile.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMain.c | 2 +- source/dnode/vnode/src/tsdb/tsdbMemTable.c | 2 +- source/dnode/vnode/src/tsdb/tsdbOptions.c | 34 ----------- source/dnode/vnode/src/tsdb/tsdbRead.c | 9 ++- source/dnode/vnode/src/tsdb/tsdbReadImpl.c | 2 +- source/dnode/vnode/src/tsdb/tsdbScan.c | 3 +- source/dnode/vnode/src/tsdb/tsdbSma.c | 4 +- source/dnode/vnode/src/tsdb/tsdbTDBImpl.c | 6 +- source/dnode/vnode/src/tsdb/tsdbWrite.c | 2 +- source/dnode/vnode/src/vnd/vnodeBufPool.c | 2 +- source/dnode/vnode/src/vnd/vnodeCfg.c | 2 +- source/dnode/vnode/src/vnd/vnodeCommit.c | 4 +- source/dnode/vnode/src/vnd/vnodeInt.c | 2 +- source/dnode/vnode/src/vnd/vnodeModule.c | 2 +- source/dnode/vnode/src/vnd/vnodeOpen.c | 3 +- source/dnode/vnode/src/vnd/vnodeQuery.c | 6 +- source/dnode/vnode/src/vnd/vnodeSvr.c | 6 +- source/dnode/vnode/src/vnd/vnodeSync.c | 11 ++-- 43 files changed, 172 insertions(+), 250 deletions(-) delete mode 100644 source/dnode/vnode/src/inc/vnodeSync.h delete mode 100644 source/dnode/vnode/src/tsdb/tsdbOptions.c diff --git a/source/dnode/vnode/CMakeLists.txt b/source/dnode/vnode/CMakeLists.txt index a9f3c9f49b..2a784c112e 100644 --- a/source/dnode/vnode/CMakeLists.txt +++ b/source/dnode/vnode/CMakeLists.txt @@ -19,7 +19,6 @@ target_sources( "src/meta/metaOpen.c" "src/meta/metaIdx.c" "src/meta/metaTable.c" - "src/meta/metaTDBImpl.c" "src/meta/metaQuery.c" "src/meta/metaCommit.c" "src/meta/metaEntry.c" @@ -33,7 +32,6 @@ target_sources( "src/tsdb/tsdbFS.c" "src/tsdb/tsdbMain.c" "src/tsdb/tsdbMemTable.c" - "src/tsdb/tsdbOptions.c" "src/tsdb/tsdbRead.c" "src/tsdb/tsdbReadImpl.c" "src/tsdb/tsdbScan.c" diff --git a/source/dnode/vnode/inc/vnode.h b/source/dnode/vnode/inc/vnode.h index 8984b3d8ae..3314d2f264 100644 --- a/source/dnode/vnode/inc/vnode.h +++ b/source/dnode/vnode/inc/vnode.h @@ -74,7 +74,7 @@ typedef struct SMeta SMeta; // todo: remove typedef struct SMetaReader SMetaReader; typedef struct SMetaEntry SMetaEntry; -void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags); +void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags); void metaReaderClear(SMetaReader *pReader); int metaReadNext(SMetaReader *pReader); @@ -90,8 +90,8 @@ int metaTbCursorNext(SMTbCursor *pTbCur); #endif // tsdb -typedef struct STsdb STsdb; -typedef void *tsdbReaderT; +typedef struct STsdb STsdb; +typedef void *tsdbReaderT; #define BLOCK_LOAD_OFFSET_SEQ_ORDER 1 #define BLOCK_LOAD_TABLE_SEQ_ORDER 2 @@ -111,7 +111,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle); void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo); int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis); SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList); -void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond); +void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond); void tsdbDestroyTableGroup(STableGroupInfo *pGroupList); int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo); int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo); diff --git a/source/dnode/vnode/src/inc/meta.h b/source/dnode/vnode/src/inc/meta.h index 076cb74852..9a0b1f4578 100644 --- a/source/dnode/vnode/src/inc/meta.h +++ b/source/dnode/vnode/src/inc/meta.h @@ -16,13 +16,14 @@ #ifndef _TD_VNODE_META_H_ #define _TD_VNODE_META_H_ +#include "vnodeInt.h" + #ifdef __cplusplus extern "C" { #endif typedef struct SMetaIdx SMetaIdx; typedef struct SMetaDB SMetaDB; -typedef struct SMCtbCursor SMCtbCursor; typedef struct SMSmaCursor SMSmaCursor; // metaDebug ================== @@ -36,22 +37,16 @@ typedef struct SMSmaCursor SMSmaCursor; // clang-format on // metaOpen ================== -int metaOpen(SVnode* pVnode, SMeta** ppMeta); -int metaClose(SMeta* pMeta); // metaEntry ================== int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME); int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME); // metaTable ================== -int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq); -int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); // metaQuery ================== int metaGetTableEntryByVersion(SMetaReader* pReader, int64_t version, tb_uid_t uid); -int metaGetTableEntryByUid(SMetaReader* pReader, tb_uid_t uid); -int metaGetTableEntryByName(SMetaReader* pReader, const char* name); // metaIdx ================== int metaOpenIdx(SMeta* pMeta); @@ -60,8 +55,6 @@ int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions); int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid); // metaCommit ================== -int metaBegin(SMeta* pMeta); - static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); } struct SMeta { @@ -106,27 +99,12 @@ typedef struct { } STtlIdxKey; #if 1 -#define META_SUPER_TABLE TD_SUPER_TABLE -#define META_CHILD_TABLE TD_CHILD_TABLE -#define META_NORMAL_TABLE TD_NORMAL_TABLE // int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle); -int metaDropTable(SMeta* pMeta, tb_uid_t uid); -int metaCommit(SMeta* pMeta); -int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); -int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); -SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); -STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); -void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode); -STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid); -SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup); -int metaGetTbNum(SMeta* pMeta); -SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid); -void metaCloseSmaCursor(SMSmaCursor* pSmaCur); -int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); -SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid); -void metaCloseCtbCurosr(SMCtbCursor* pCtbCur); -tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); +int metaDropTable(SMeta* pMeta, tb_uid_t uid); +SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid); +void metaCloseSmaCursor(SMSmaCursor* pSmaCur); +int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur); #ifndef META_REFACT // SMetaDB diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 347d28f1ea..b0461067e1 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -16,6 +16,8 @@ #ifndef _TD_VNODE_TQ_H_ #define _TD_VNODE_TQ_H_ +#include "vnodeInt.h" + #include "executor.h" #include "os.h" #include "tcache.h" @@ -237,17 +239,7 @@ int tqInit(); void tqCleanUp(); // open in each vnode -STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); -void tqClose(STQ*); // required by vnode -int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); -int tqCommit(STQ*); - -int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); -int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); -int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); -int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**); int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**); diff --git a/source/dnode/vnode/src/inc/tsdb.h b/source/dnode/vnode/src/inc/tsdb.h index 5972e0f9d2..3349accb88 100644 --- a/source/dnode/vnode/src/inc/tsdb.h +++ b/source/dnode/vnode/src/inc/tsdb.h @@ -16,6 +16,8 @@ #ifndef _TD_VNODE_TSDB_H_ #define _TD_VNODE_TSDB_H_ +#include "vnodeInt.h" + #ifdef __cplusplus extern "C" { #endif @@ -43,7 +45,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKe TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo); // tsdbCommit ================ -int tsdbBegin(STsdb *pTsdb); #if 1 @@ -60,16 +61,9 @@ struct STable { #define TABLE_TID(t) (t)->tid #define TABLE_UID(t) (t)->uid -int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb); -int tsdbClose(STsdb *pTsdb); -int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp); int tsdbPrepareCommit(STsdb *pTsdb); -int tsdbCommit(STsdb *pTsdb); int32_t tsdbInitSma(STsdb *pTsdb); -int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg); int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg); -int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version); -int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg); int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid); int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg); void tsdbCleanupReadHandle(tsdbReaderT queryHandle); @@ -237,12 +231,6 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) { return TD_ROW_KEY(row); } -// tsdbOptions -extern const STsdbCfg defautlTsdbOptions; - -int tsdbValidateOptions(const STsdbCfg *); -void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc); - // tsdbReadImpl typedef struct SReadH SReadH; diff --git a/source/dnode/vnode/src/inc/tsdbSma.h b/source/dnode/vnode/src/inc/tsdbSma.h index e20d2989b9..8fb18ddfea 100644 --- a/source/dnode/vnode/src/inc/tsdbSma.h +++ b/source/dnode/vnode/src/inc/tsdbSma.h @@ -16,9 +16,7 @@ #ifndef _TD_VNODE_TSDB_SMA_H_ #define _TD_VNODE_TSDB_SMA_H_ -#include "os.h" -#include "thash.h" -#include "tmsg.h" +#include "tsdb.h" #ifdef __cplusplus extern "C" { @@ -32,12 +30,6 @@ struct STbDdlH { __tb_ddl_fn_t fp; }; -typedef struct { - tb_uid_t suid; - SArray *tbUids; - SHashObj *uidHash; -} STbUidStore; - static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) { ASSERT(*pStore == NULL); *pStore = taosMemoryCalloc(1, sizeof(STbUidStore)); diff --git a/source/dnode/vnode/src/inc/vnd.h b/source/dnode/vnode/src/inc/vnd.h index 055120568e..3f5435ee47 100644 --- a/source/dnode/vnode/src/inc/vnd.h +++ b/source/dnode/vnode/src/inc/vnd.h @@ -16,6 +16,10 @@ #ifndef _TD_VND_H_ #define _TD_VND_H_ +#include "sync.h" +#include "syncTools.h" +#include "vnodeInt.h" + #ifdef __cplusplus extern "C" { #endif @@ -58,11 +62,9 @@ struct SVBufPool { SVBufPoolNode node; }; -int vnodeOpenBufPool(SVnode* pVnode, int64_t size); -int vnodeCloseBufPool(SVnode* pVnode); -void vnodeBufPoolReset(SVBufPool* pPool); -void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); -void vnodeBufPoolFree(SVBufPool* pPool, void* p); +int vnodeOpenBufPool(SVnode* pVnode, int64_t size); +int vnodeCloseBufPool(SVnode* pVnode); +void vnodeBufPoolReset(SVBufPool* pPool); // vnodeQuery ==================== int vnodeQueryOpen(SVnode* pVnode); @@ -79,6 +81,20 @@ int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo); int vnodeSyncCommit(SVnode* pVnode); int vnodeAsyncCommit(SVnode* pVnode); +// vnodeCommit ==================== +int32_t vnodeSyncOpen(SVnode* pVnode, char* path); +int32_t vnodeSyncStart(SVnode* pVnode); +void vnodeSyncClose(SVnode* pVnode); +void vnodeSyncSetQ(SVnode* pVnode, void* qHandle); +void vnodeSyncSetRpc(SVnode* pVnode, void* rpcHandle); +int32_t vnodeSyncEqMsg(void* qHandle, SRpcMsg* pMsg); +int32_t vnodeSendMsg(void* rpcHandle, const SEpSet* pEpSet, SRpcMsg* pMsg); +void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); +void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); +void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta); +int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot); +SSyncFSM* syncVnodeMakeFsm(); + #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeInt.h b/source/dnode/vnode/src/inc/vnodeInt.h index 0f81ce8dfc..2d4cee3cad 100644 --- a/source/dnode/vnode/src/inc/vnodeInt.h +++ b/source/dnode/vnode/src/inc/vnodeInt.h @@ -59,6 +59,55 @@ typedef struct SQWorkerMgmt SQHandle; #define VNODE_TQ_DIR "tq" #define VNODE_WAL_DIR "wal" +// vnd.h +void* vnodeBufPoolMalloc(SVBufPool* pPool, int size); +void vnodeBufPoolFree(SVBufPool* pPool, void* p); + +// meta +typedef struct SMCtbCursor SMCtbCursor; +typedef struct STbUidStore STbUidStore; + +int metaOpen(SVnode* pVnode, SMeta** ppMeta); +int metaClose(SMeta* pMeta); +int metaBegin(SMeta* pMeta); +int metaCommit(SMeta* pMeta); +int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq); +int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq); +SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline); +STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver); +int metaGetTableEntryByUid(SMetaReader* pReader, tb_uid_t uid); +int metaGetTableEntryByName(SMetaReader* pReader, const char* name); +int metaGetTbNum(SMeta* pMeta); +SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid); +void metaCloseCtbCurosr(SMCtbCursor* pCtbCur); +tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur); +SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup); +void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode); +STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid); +int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg); +int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid); + +// tsdb +int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb); +int tsdbClose(STsdb* pTsdb); +int tsdbBegin(STsdb* pTsdb); +int tsdbCommit(STsdb* pTsdb); +int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version); +int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg); +int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg); +int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp); + +// tq +STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal); +void tqClose(STQ*); +int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver); +int tqCommit(STQ*); +int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId); +int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen); +int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId); +int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId); + typedef struct { int8_t streamType; // sma or other int8_t dstType; @@ -106,6 +155,12 @@ struct SVnode { SQHandle* pQuery; }; +struct STbUidStore { + tb_uid_t suid; + SArray* tbUids; + SHashObj* uidHash; +}; + #define TD_VID(PVNODE) (PVNODE)->config.vgId typedef struct STbDdlH STbDdlH; @@ -113,18 +168,6 @@ typedef struct STbDdlH STbDdlH; // sma void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data); -#include "vnd.h" - -#include "meta.h" - -#include "tsdb.h" - -#include "tq.h" - -#include "vnodeSync.h" - -#include "tsdbSma.h" - #ifdef __cplusplus } #endif diff --git a/source/dnode/vnode/src/inc/vnodeSync.h b/source/dnode/vnode/src/inc/vnodeSync.h deleted file mode 100644 index fea94c4607..0000000000 --- a/source/dnode/vnode/src/inc/vnodeSync.h +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef _TD_VNODE_SYNC_H_ -#define _TD_VNODE_SYNC_H_ - -#ifdef __cplusplus -extern "C" { -#endif - -int32_t vnodeSyncOpen(SVnode *pVnode, char *path); -int32_t vnodeSyncStart(SVnode *pVnode); -void vnodeSyncClose(SVnode *pVnode); - -void vnodeSyncSetQ(SVnode *pVnode, void *qHandle); -void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle); - -int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg); -int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg); - -void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta); -int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot); - -SSyncFSM *syncVnodeMakeFsm(); - -#ifdef __cplusplus -} -#endif - -#endif /*_TD_VNODE_SYNC_H_*/ diff --git a/source/dnode/vnode/src/meta/metaCommit.c b/source/dnode/vnode/src/meta/metaCommit.c index 246294d72f..456c4fd7ee 100644 --- a/source/dnode/vnode/src/meta/metaCommit.c +++ b/source/dnode/vnode/src/meta/metaCommit.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "meta.h" static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBufPoolMalloc((SVBufPool *)pPool, size); } static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); } diff --git a/source/dnode/vnode/src/meta/metaEntry.c b/source/dnode/vnode/src/meta/metaEntry.c index a3cb812d93..e71a748f97 100644 --- a/source/dnode/vnode/src/meta/metaEntry.c +++ b/source/dnode/vnode/src/meta/metaEntry.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "meta.h" int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) { if (tStartEncode(pCoder) < 0) return -1; diff --git a/source/dnode/vnode/src/meta/metaIdx.c b/source/dnode/vnode/src/meta/metaIdx.c index dbab5dff09..853b2ecefb 100644 --- a/source/dnode/vnode/src/meta/metaIdx.c +++ b/source/dnode/vnode/src/meta/metaIdx.c @@ -16,7 +16,7 @@ #ifdef USE_INVERTED_INDEX #include "index.h" #endif -#include "vnodeInt.h" +#include "meta.h" struct SMetaIdx { #ifdef USE_INVERTED_INDEX diff --git a/source/dnode/vnode/src/meta/metaOpen.c b/source/dnode/vnode/src/meta/metaOpen.c index 940bf3e440..e65f5229af 100644 --- a/source/dnode/vnode/src/meta/metaOpen.c +++ b/source/dnode/vnode/src/meta/metaOpen.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "meta.h" static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); diff --git a/source/dnode/vnode/src/meta/metaQuery.c b/source/dnode/vnode/src/meta/metaQuery.c index 964637c976..dbfa8fe9ed 100644 --- a/source/dnode/vnode/src/meta/metaQuery.c +++ b/source/dnode/vnode/src/meta/metaQuery.c @@ -13,12 +13,12 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "meta.h" -void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags) { +void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) { memset(pReader, 0, sizeof(*pReader)); pReader->flags = flags; - pReader->pMeta = pVnode->pMeta; + pReader->pMeta = pMeta; } void metaReaderClear(SMetaReader *pReader) { @@ -91,7 +91,7 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) { return NULL; } - metaReaderInit(&pTbCur->mr, pMeta->pVnode, 0); + metaReaderInit(&pTbCur->mr, pMeta, 0); tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc); @@ -122,7 +122,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) { } metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey); - if (pTbCur->mr.me.type == META_SUPER_TABLE) { + if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) { continue; } @@ -234,7 +234,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) { STSchemaBuilder sb = {0}; SSchema *pSchema; - metaReaderInit(&mr, pMeta->pVnode, 0); + metaReaderInit(&mr, pMeta, 0); metaGetTableEntryByUid(&mr, uid); if (mr.me.type == TSDB_CHILD_TABLE) { diff --git a/source/dnode/vnode/src/meta/metaTable.c b/source/dnode/vnode/src/meta/metaTable.c index 014af4b10d..452d52ce5e 100644 --- a/source/dnode/vnode/src/meta/metaTable.c +++ b/source/dnode/vnode/src/meta/metaTable.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "meta.h" static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); @@ -37,7 +37,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) { SMetaReader mr = {0}; // validate req - metaReaderInit(&mr, pMeta->pVnode, 0); + metaReaderInit(&mr, pMeta, 0); if (metaGetTableEntryByName(&mr, pReq->name) == 0) { // TODO: just for pass case #if 0 @@ -91,7 +91,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) { pReq->ctime = taosGetTimestampSec(); // validate req - metaReaderInit(&mr, pMeta->pVnode, 0); + metaReaderInit(&mr, pMeta, 0); if (metaGetTableEntryByName(&mr, pReq->name) == 0) { terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST; metaReaderClear(&mr); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 8cabada511..28a489752a 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tq.h" int32_t tqInit() { // @@ -176,9 +176,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_ atomic_store_ptr(&pExec->pushHandle.handle, NULL); taosWUnLockLatch(&pExec->pushHandle.lock); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", - TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum, - rsp.reqOffset, rsp.rspOffset); + tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", + TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum, + rsp.reqOffset, rsp.rspOffset); // TODO destroy taosArrayDestroy(rsp.blockData); @@ -390,8 +390,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); + tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, + TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey)); ASSERT(pExec); @@ -418,16 +418,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { while (1) { consumerEpoch = atomic_load_32(&pExec->epoch); if (consumerEpoch > reqEpoch) { - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", - consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); + tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", + consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } taosThreadMutexLock(&pExec->pWalReader->mutex); if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) { - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), fetchOffset); + tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + TD_VID(pTq->pVnode), fetchOffset); taosThreadMutexUnlock(&pExec->pWalReader->mutex); break; } @@ -448,7 +448,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); #if 0 @@ -476,8 +476,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } #endif - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, - TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); + tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, + TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); if (pHead->msgType == TDMT_VND_SUBMIT) { SSubmitReq* pCont = (SSubmitReq*)&pHead->body; @@ -591,8 +591,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", - TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); + tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld", + TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset); // TODO destroy taosArrayDestroy(rsp.blockData); @@ -618,7 +618,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { fetchOffset = pReq->currentOffset + 1; } - vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, + tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch, TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset); SMqPollRspV2 rspV2 = {0}; @@ -660,7 +660,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { return 0; } - vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, + tqDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch, TD_VID(pTq->pVnode)); rspV2.reqOffset = pReq->currentOffset; @@ -671,7 +671,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO consumerEpoch = atomic_load_32(&pConsumer->epoch); if (consumerEpoch > reqEpoch) { - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", + tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch); break; } @@ -680,11 +680,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { // TODO: no more log, set timer to wait blocking time // if data inserted during waiting, launch query and // response to user - vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, + tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); break; } - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, + tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, pHead->msgType); /*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/ /*pHead = pTopic->pReadhandle->pHead;*/ @@ -709,7 +709,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { } if (taosArrayGetSize(pRes) == 0) { - vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, + tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset); fetchOffset++; rspV2.skipLogNum++; @@ -770,7 +770,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->pCont = buf; pMsg->contLen = msgLen; pMsg->code = 0; - vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, + tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset, pHead->msgType, consumerId, pReq->epoch); tmsgSendRsp(pMsg); taosMemoryFree(pHead); @@ -804,7 +804,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) { pMsg->contLen = tlen; pMsg->code = 0; tmsgSendRsp(pMsg); - vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, + tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch); /*}*/ diff --git a/source/dnode/vnode/src/tq/tqCommit.c b/source/dnode/vnode/src/tq/tqCommit.c index 8e59243a9c..e31566f3fa 100644 --- a/source/dnode/vnode/src/tq/tqCommit.c +++ b/source/dnode/vnode/src/tq/tqCommit.c @@ -13,4 +13,4 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tq.h" diff --git a/source/dnode/vnode/src/tq/tqMetaStore.c b/source/dnode/vnode/src/tq/tqMetaStore.c index 72469b74ac..ca09cc1dc1 100644 --- a/source/dnode/vnode/src/tq/tqMetaStore.c +++ b/source/dnode/vnode/src/tq/tqMetaStore.c @@ -12,7 +12,7 @@ * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tq.h" // #include // #include // #include @@ -86,7 +86,7 @@ STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, F } strcpy(pMeta->dirPath, path); - char *name = taosMemoryMalloc(pathLen + 10) ; + char* name = taosMemoryMalloc(pathLen + 10); strcpy(name, path); if (!taosDirExist(name) && taosMkDir(name) != 0) { diff --git a/source/dnode/vnode/src/tq/tqOffset.c b/source/dnode/vnode/src/tq/tqOffset.c index 3cff87340d..90f512611b 100644 --- a/source/dnode/vnode/src/tq/tqOffset.c +++ b/source/dnode/vnode/src/tq/tqOffset.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "vnodeInt.h" +#include "tq.h" enum ETqOffsetPersist { TQ_OFFSET_PERSIST__LAZY = 1, diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index aeb9f27eab..cf1154071d 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tq.h" STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) { STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle)); diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit.c b/source/dnode/vnode/src/tsdb/tsdbCommit.c index f5483b1141..29f148b64d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" #define TSDB_MAX_SUBBLOCKS 8 diff --git a/source/dnode/vnode/src/tsdb/tsdbCommit2.c b/source/dnode/vnode/src/tsdb/tsdbCommit2.c index b16f2f0fdb..585ef63531 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCommit2.c +++ b/source/dnode/vnode/src/tsdb/tsdbCommit2.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" int tsdbBegin(STsdb *pTsdb) { STsdbMemTable *pMem; diff --git a/source/dnode/vnode/src/tsdb/tsdbCompact.c b/source/dnode/vnode/src/tsdb/tsdbCompact.c index e24d5974af..9e0721815a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbCompact.c +++ b/source/dnode/vnode/src/tsdb/tsdbCompact.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ #if 0 -#include "tsdbint.h" +#include "tsdb.h" typedef struct { STable * pTable; @@ -33,15 +33,15 @@ typedef struct { SDataCols *pDataCols; } SCompactH; -#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet)) -#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh)) +#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet)) +#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh)) #define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD) #define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA) #define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST) #define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD) #define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL) -#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh)) -#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh)) +#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh)) +#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh)) static int tsdbAsyncCompact(STsdbRepo *pRepo); static void tsdbStartCompact(STsdbRepo *pRepo); @@ -531,4 +531,3 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) { } #endif - diff --git a/source/dnode/vnode/src/tsdb/tsdbFS.c b/source/dnode/vnode/src/tsdb/tsdbFS.c index 0012a50ccf..91a7e1cd54 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFS.c +++ b/source/dnode/vnode/src/tsdb/tsdbFS.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T; static const char *tsdbTxnFname[] = {"current.t", "current"}; diff --git a/source/dnode/vnode/src/tsdb/tsdbFile.c b/source/dnode/vnode/src/tsdb/tsdbFile.c index 2fbe819476..74e3c66a9d 100644 --- a/source/dnode/vnode/src/tsdb/tsdbFile.c +++ b/source/dnode/vnode/src/tsdb/tsdbFile.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" static const char *TSDB_FNAME_SUFFIX[] = { "head", // TSDB_FILE_HEAD diff --git a/source/dnode/vnode/src/tsdb/tsdbMain.c b/source/dnode/vnode/src/tsdb/tsdbMain.c index c1025e13eb..de5ff9ac91 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMain.c +++ b/source/dnode/vnode/src/tsdb/tsdbMain.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) { STsdb *pTsdb = NULL; diff --git a/source/dnode/vnode/src/tsdb/tsdbMemTable.c b/source/dnode/vnode/src/tsdb/tsdbMemTable.c index d278a8d98d..323fc7970b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbMemTable.c +++ b/source/dnode/vnode/src/tsdb/tsdbMemTable.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" static STbData *tsdbNewTbData(tb_uid_t uid); static void tsdbFreeTbData(STbData *pTbData); diff --git a/source/dnode/vnode/src/tsdb/tsdbOptions.c b/source/dnode/vnode/src/tsdb/tsdbOptions.c deleted file mode 100644 index 3560c9feaa..0000000000 --- a/source/dnode/vnode/src/tsdb/tsdbOptions.c +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "vnodeInt.h" - -const STsdbCfg defautlTsdbOptions = {.precision = 0, - .lruCacheSize = 0, - .days = 10, - .minRows = 100, - .maxRows = 4096, - .keep2 = 3650, - .keep0 = 3650, - .keep1 = 3650, - .update = 0, - .compression = TWO_STAGE_COMP}; - -int tsdbValidateOptions(const STsdbCfg *pTsdbOptions) { - // TODO - return 0; -} - -void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc) { memcpy(pDest, pSrc, sizeof(STsdbCfg)); } \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 2c8819f60b..302ee89e51 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" #define EXTRA_BYTES 2 #define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC) @@ -3618,7 +3618,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) { SMetaReader mr = {0}; - metaReaderInit(&mr, ((SMeta*)pMeta)->pVnode, 0); + metaReaderInit(&mr, (SMeta*)pMeta, 0); if (metaGetTableEntryByUid(&mr, uid) < 0) { tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); @@ -3628,7 +3628,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); } - if (mr.me.type != META_SUPER_TABLE) { + if (mr.me.type != TSDB_SUPER_TABLE) { tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId); terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client @@ -3687,10 +3687,9 @@ int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) { return TSDB_CODE_SUCCESS; } int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) { - SMeta* metaP = (SMeta*)pMeta; SMetaReader mr = {0}; - metaReaderInit(&mr, metaP->pVnode, 0); + metaReaderInit(&mr, (SMeta*)pMeta, 0); if (metaGetTableEntryByUid(&mr, uid) < 0) { terrno = TSDB_CODE_TDB_INVALID_TABLE_ID; diff --git a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c index f18f36e07b..c39780372b 100644 --- a/source/dnode/vnode/src/tsdb/tsdbReadImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbReadImpl.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" #define TSDB_KEY_COL_OFFSET 0 diff --git a/source/dnode/vnode/src/tsdb/tsdbScan.c b/source/dnode/vnode/src/tsdb/tsdbScan.c index c0e468e640..cc6fb473d0 100644 --- a/source/dnode/vnode/src/tsdb/tsdbScan.c +++ b/source/dnode/vnode/src/tsdb/tsdbScan.c @@ -13,9 +13,8 @@ * along with this program. If not, see . */ - #if 0 -#include "tsdbint.h" +#include "tsdb.h" #ifndef _TSDB_PLUGINS int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; } diff --git a/source/dnode/vnode/src/tsdb/tsdbSma.c b/source/dnode/vnode/src/tsdb/tsdbSma.c index 930ea58151..c9e33cefc7 100644 --- a/source/dnode/vnode/src/tsdb/tsdbSma.c +++ b/source/dnode/vnode/src/tsdb/tsdbSma.c @@ -13,7 +13,8 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdbSma.h" +#include "tsdb.h" static const char *TSDB_SMA_DNAME[] = { "", // TSDB_SMA_TYPE_BLOCK @@ -678,7 +679,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers return TSDB_CODE_FAILED; } - if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) { terrno = TSDB_CODE_TDB_INIT_FAILED; return TSDB_CODE_FAILED; diff --git a/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c b/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c index ebfa1ecaeb..ac3baf8c3a 100644 --- a/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c +++ b/source/dnode/vnode/src/tsdb/tsdbTDBImpl.c @@ -15,14 +15,14 @@ #define ALLOW_FORBID_FUNC -#include "vnodeInt.h" +#include "tsdb.h" int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path) { - int ret = 0; + int ret = 0; if (path == NULL) return -1; - ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param + ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param if (ret != 0) { tsdbError("Failed to create tsdb db env, ret = %d", ret); diff --git a/source/dnode/vnode/src/tsdb/tsdbWrite.c b/source/dnode/vnode/src/tsdb/tsdbWrite.c index 79a56c3c9d..82fa6b9ae5 100644 --- a/source/dnode/vnode/src/tsdb/tsdbWrite.c +++ b/source/dnode/vnode/src/tsdb/tsdbWrite.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "tsdb.h" static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg); diff --git a/source/dnode/vnode/src/vnd/vnodeBufPool.c b/source/dnode/vnode/src/vnd/vnodeBufPool.c index 0125cdc82f..9122913cda 100644 --- a/source/dnode/vnode/src/vnd/vnodeBufPool.c +++ b/source/dnode/vnode/src/vnd/vnodeBufPool.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "vnd.h" /* ------------------------ STRUCTURES ------------------------ */ diff --git a/source/dnode/vnode/src/vnd/vnodeCfg.c b/source/dnode/vnode/src/vnd/vnodeCfg.c index 905bf89be4..df8cf8d503 100644 --- a/source/dnode/vnode/src/vnd/vnodeCfg.c +++ b/source/dnode/vnode/src/vnd/vnodeCfg.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "vnd.h" const SVnodeCfg vnodeCfgDefault = { .vgId = -1, diff --git a/source/dnode/vnode/src/vnd/vnodeCommit.c b/source/dnode/vnode/src/vnd/vnodeCommit.c index cee1018b71..1c0e65aa95 100644 --- a/source/dnode/vnode/src/vnd/vnodeCommit.c +++ b/source/dnode/vnode/src/vnd/vnodeCommit.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "vnd.h" #define VND_INFO_FNAME "vnode.json" #define VND_INFO_FNAME_TMP "vnode_tmp.json" @@ -175,7 +175,7 @@ int vnodeAsyncCommit(SVnode *pVnode) { vnodeWaitCommit(pVnode); // vnodeBufPoolSwitch(pVnode); - tsdbPrepareCommit(pVnode->pTsdb); + // tsdbPrepareCommit(pVnode->pTsdb); vnodeScheduleTask(vnodeCommitImpl, pVnode); diff --git a/source/dnode/vnode/src/vnd/vnodeInt.c b/source/dnode/vnode/src/vnd/vnodeInt.c index 10d8154a15..14dfc75ced 100644 --- a/source/dnode/vnode/src/vnd/vnodeInt.c +++ b/source/dnode/vnode/src/vnd/vnodeInt.c @@ -14,7 +14,7 @@ */ #define _DEFAULT_SOURCE -#include "vnodeInt.h" +#include "vnd.h" // #include "vnodeInt.h" int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; } diff --git a/source/dnode/vnode/src/vnd/vnodeModule.c b/source/dnode/vnode/src/vnd/vnodeModule.c index 2b5b46a45d..efae74b55a 100644 --- a/source/dnode/vnode/src/vnd/vnodeModule.c +++ b/source/dnode/vnode/src/vnd/vnodeModule.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "vnd.h" typedef struct SVnodeTask SVnodeTask; struct SVnodeTask { diff --git a/source/dnode/vnode/src/vnd/vnodeOpen.c b/source/dnode/vnode/src/vnd/vnodeOpen.c index 38f39ae1e3..ee64976df9 100644 --- a/source/dnode/vnode/src/vnd/vnodeOpen.c +++ b/source/dnode/vnode/src/vnd/vnodeOpen.c @@ -13,8 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" -#include "vnodeSync.h" +#include "vnd.h" int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) { SVnodeInfo info = {0}; diff --git a/source/dnode/vnode/src/vnd/vnodeQuery.c b/source/dnode/vnode/src/vnd/vnodeQuery.c index bcb424d133..8da94ab78b 100644 --- a/source/dnode/vnode/src/vnd/vnodeQuery.c +++ b/source/dnode/vnode/src/vnd/vnodeQuery.c @@ -13,7 +13,7 @@ * along with this program. If not, see . */ -#include "vnodeInt.h" +#include "vnd.h" int vnodeQueryOpen(SVnode *pVnode) { return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb); @@ -51,7 +51,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { } // query meta - metaReaderInit(&mer1, pVnode, 0); + metaReaderInit(&mer1, pVnode->pMeta, 0); if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) { goto _exit; @@ -67,7 +67,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) { schemaTag = mer1.me.stbEntry.schemaTag; metaRsp.suid = mer1.me.uid; } else if (mer1.me.type == TSDB_CHILD_TABLE) { - metaReaderInit(&mer2, pVnode, 0); + metaReaderInit(&mer2, pVnode->pMeta, 0); if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit; strcpy(metaRsp.stbName, mer2.me.name); diff --git a/source/dnode/vnode/src/vnd/vnodeSvr.c b/source/dnode/vnode/src/vnd/vnodeSvr.c index 61cedc6b50..efcc82853c 100644 --- a/source/dnode/vnode/src/vnd/vnodeSvr.c +++ b/source/dnode/vnode/src/vnd/vnodeSvr.c @@ -13,9 +13,7 @@ * along with this program. If not, see . */ -#include "sync.h" -#include "syncTools.h" -#include "vnodeInt.h" +#include "vnd.h" static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp); static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp); @@ -201,9 +199,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) { // sync integration int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) { - if (syncEnvIsStart()) { - SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync); assert(pSyncNode != NULL); diff --git a/source/dnode/vnode/src/vnd/vnodeSync.c b/source/dnode/vnode/src/vnd/vnodeSync.c index 5f8bf3c7b5..26393394e9 100644 --- a/source/dnode/vnode/src/vnd/vnodeSync.c +++ b/source/dnode/vnode/src/vnd/vnodeSync.c @@ -13,10 +13,11 @@ * along with this program. If not, see . */ -#include "sync.h" -#include "syncTools.h" -#include "tmsgcb.h" -#include "vnodeInt.h" +#include "vnd.h" +// #include "sync.h" +// #include "syncTools.h" +// #include "tmsgcb.h" +// #include "vnodeInt.h" // sync integration @@ -113,7 +114,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex); syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg); - SVnode * pVnode = (SVnode *)(pFsm->data); + SVnode *pVnode = (SVnode *)(pFsm->data); SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta); SRpcMsg applyMsg; syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);