Merge pull request #11907 from taosdata/feature/vnode_refact1
refactor: vnode
This commit is contained in:
commit
ef67f710c4
|
@ -19,7 +19,6 @@ target_sources(
|
||||||
"src/meta/metaOpen.c"
|
"src/meta/metaOpen.c"
|
||||||
"src/meta/metaIdx.c"
|
"src/meta/metaIdx.c"
|
||||||
"src/meta/metaTable.c"
|
"src/meta/metaTable.c"
|
||||||
"src/meta/metaTDBImpl.c"
|
|
||||||
"src/meta/metaQuery.c"
|
"src/meta/metaQuery.c"
|
||||||
"src/meta/metaCommit.c"
|
"src/meta/metaCommit.c"
|
||||||
"src/meta/metaEntry.c"
|
"src/meta/metaEntry.c"
|
||||||
|
@ -33,7 +32,6 @@ target_sources(
|
||||||
"src/tsdb/tsdbFS.c"
|
"src/tsdb/tsdbFS.c"
|
||||||
"src/tsdb/tsdbMain.c"
|
"src/tsdb/tsdbMain.c"
|
||||||
"src/tsdb/tsdbMemTable.c"
|
"src/tsdb/tsdbMemTable.c"
|
||||||
"src/tsdb/tsdbOptions.c"
|
|
||||||
"src/tsdb/tsdbRead.c"
|
"src/tsdb/tsdbRead.c"
|
||||||
"src/tsdb/tsdbReadImpl.c"
|
"src/tsdb/tsdbReadImpl.c"
|
||||||
"src/tsdb/tsdbScan.c"
|
"src/tsdb/tsdbScan.c"
|
||||||
|
|
|
@ -74,7 +74,7 @@ typedef struct SMeta SMeta; // todo: remove
|
||||||
typedef struct SMetaReader SMetaReader;
|
typedef struct SMetaReader SMetaReader;
|
||||||
typedef struct SMetaEntry SMetaEntry;
|
typedef struct SMetaEntry SMetaEntry;
|
||||||
|
|
||||||
void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags);
|
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
|
||||||
void metaReaderClear(SMetaReader *pReader);
|
void metaReaderClear(SMetaReader *pReader);
|
||||||
int metaReadNext(SMetaReader *pReader);
|
int metaReadNext(SMetaReader *pReader);
|
||||||
|
|
||||||
|
@ -90,8 +90,8 @@ int metaTbCursorNext(SMTbCursor *pTbCur);
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
// tsdb
|
// tsdb
|
||||||
typedef struct STsdb STsdb;
|
typedef struct STsdb STsdb;
|
||||||
typedef void *tsdbReaderT;
|
typedef void *tsdbReaderT;
|
||||||
|
|
||||||
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
|
||||||
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
|
||||||
|
@ -111,7 +111,7 @@ bool tsdbNextDataBlock(tsdbReaderT pTsdbReadHandle);
|
||||||
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
void tsdbRetrieveDataBlockInfo(tsdbReaderT *pTsdbReadHandle, SDataBlockInfo *pBlockInfo);
|
||||||
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis);
|
int32_t tsdbRetrieveDataBlockStatisInfo(tsdbReaderT *pTsdbReadHandle, SColumnDataAgg **pBlockStatis);
|
||||||
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
|
SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumnIdList);
|
||||||
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond);
|
void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond *pCond);
|
||||||
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
void tsdbDestroyTableGroup(STableGroupInfo *pGroupList);
|
||||||
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetOneTableGroup(void *pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo *pGroupInfo);
|
||||||
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
|
int32_t tsdbGetTableGroupFromIdList(STsdb *tsdb, SArray *pTableIdList, STableGroupInfo *pGroupInfo);
|
||||||
|
|
|
@ -16,13 +16,14 @@
|
||||||
#ifndef _TD_VNODE_META_H_
|
#ifndef _TD_VNODE_META_H_
|
||||||
#define _TD_VNODE_META_H_
|
#define _TD_VNODE_META_H_
|
||||||
|
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef struct SMetaIdx SMetaIdx;
|
typedef struct SMetaIdx SMetaIdx;
|
||||||
typedef struct SMetaDB SMetaDB;
|
typedef struct SMetaDB SMetaDB;
|
||||||
typedef struct SMCtbCursor SMCtbCursor;
|
|
||||||
typedef struct SMSmaCursor SMSmaCursor;
|
typedef struct SMSmaCursor SMSmaCursor;
|
||||||
|
|
||||||
// metaDebug ==================
|
// metaDebug ==================
|
||||||
|
@ -36,22 +37,16 @@ typedef struct SMSmaCursor SMSmaCursor;
|
||||||
// clang-format on
|
// clang-format on
|
||||||
|
|
||||||
// metaOpen ==================
|
// metaOpen ==================
|
||||||
int metaOpen(SVnode* pVnode, SMeta** ppMeta);
|
|
||||||
int metaClose(SMeta* pMeta);
|
|
||||||
|
|
||||||
// metaEntry ==================
|
// metaEntry ==================
|
||||||
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME);
|
int metaEncodeEntry(SCoder* pCoder, const SMetaEntry* pME);
|
||||||
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME);
|
int metaDecodeEntry(SCoder* pCoder, SMetaEntry* pME);
|
||||||
|
|
||||||
// metaTable ==================
|
// metaTable ==================
|
||||||
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
|
||||||
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq);
|
||||||
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
|
||||||
|
|
||||||
// metaQuery ==================
|
// metaQuery ==================
|
||||||
int metaGetTableEntryByVersion(SMetaReader* pReader, int64_t version, tb_uid_t uid);
|
int metaGetTableEntryByVersion(SMetaReader* pReader, int64_t version, tb_uid_t uid);
|
||||||
int metaGetTableEntryByUid(SMetaReader* pReader, tb_uid_t uid);
|
|
||||||
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
|
||||||
|
|
||||||
// metaIdx ==================
|
// metaIdx ==================
|
||||||
int metaOpenIdx(SMeta* pMeta);
|
int metaOpenIdx(SMeta* pMeta);
|
||||||
|
@ -60,8 +55,6 @@ int metaSaveTableToIdx(SMeta* pMeta, const STbCfg* pTbOptions);
|
||||||
int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
|
int metaRemoveTableFromIdx(SMeta* pMeta, tb_uid_t uid);
|
||||||
|
|
||||||
// metaCommit ==================
|
// metaCommit ==================
|
||||||
int metaBegin(SMeta* pMeta);
|
|
||||||
|
|
||||||
static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); }
|
static FORCE_INLINE tb_uid_t metaGenerateUid(SMeta* pMeta) { return tGenIdPI64(); }
|
||||||
|
|
||||||
struct SMeta {
|
struct SMeta {
|
||||||
|
@ -106,27 +99,12 @@ typedef struct {
|
||||||
} STtlIdxKey;
|
} STtlIdxKey;
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
#define META_SUPER_TABLE TD_SUPER_TABLE
|
|
||||||
#define META_CHILD_TABLE TD_CHILD_TABLE
|
|
||||||
#define META_NORMAL_TABLE TD_NORMAL_TABLE
|
|
||||||
|
|
||||||
// int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
|
// int metaCreateTable(SMeta* pMeta, STbCfg* pTbCfg, STbDdlH* pHandle);
|
||||||
int metaDropTable(SMeta* pMeta, tb_uid_t uid);
|
int metaDropTable(SMeta* pMeta, tb_uid_t uid);
|
||||||
int metaCommit(SMeta* pMeta);
|
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
|
||||||
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
|
void metaCloseSmaCursor(SMSmaCursor* pSmaCur);
|
||||||
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
|
||||||
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
|
||||||
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
|
||||||
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);
|
|
||||||
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid);
|
|
||||||
SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup);
|
|
||||||
int metaGetTbNum(SMeta* pMeta);
|
|
||||||
SMSmaCursor* metaOpenSmaCursor(SMeta* pMeta, tb_uid_t uid);
|
|
||||||
void metaCloseSmaCursor(SMSmaCursor* pSmaCur);
|
|
||||||
int64_t metaSmaCursorNext(SMSmaCursor* pSmaCur);
|
|
||||||
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
|
|
||||||
void metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
|
|
||||||
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
|
||||||
|
|
||||||
#ifndef META_REFACT
|
#ifndef META_REFACT
|
||||||
// SMetaDB
|
// SMetaDB
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_VNODE_TQ_H_
|
#ifndef _TD_VNODE_TQ_H_
|
||||||
#define _TD_VNODE_TQ_H_
|
#define _TD_VNODE_TQ_H_
|
||||||
|
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#include "executor.h"
|
#include "executor.h"
|
||||||
#include "os.h"
|
#include "os.h"
|
||||||
#include "tcache.h"
|
#include "tcache.h"
|
||||||
|
@ -237,17 +239,7 @@ int tqInit();
|
||||||
void tqCleanUp();
|
void tqCleanUp();
|
||||||
|
|
||||||
// open in each vnode
|
// open in each vnode
|
||||||
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
|
||||||
void tqClose(STQ*);
|
|
||||||
// required by vnode
|
// required by vnode
|
||||||
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
|
||||||
int tqCommit(STQ*);
|
|
||||||
|
|
||||||
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
|
||||||
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
|
||||||
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
|
||||||
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
|
||||||
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
|
||||||
|
|
||||||
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
|
int32_t tqSerializeConsumer(const STqConsumer*, STqSerializedHead**);
|
||||||
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
|
int32_t tqDeserializeConsumer(STQ*, const STqSerializedHead*, STqConsumer**);
|
||||||
|
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_VNODE_TSDB_H_
|
#ifndef _TD_VNODE_TSDB_H_
|
||||||
#define _TD_VNODE_TSDB_H_
|
#define _TD_VNODE_TSDB_H_
|
||||||
|
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -43,7 +45,6 @@ int tsdbLoadDataFromCache(STable *pTable, SSkipListIterator *pIter, TSKEY maxKe
|
||||||
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
|
||||||
|
|
||||||
// tsdbCommit ================
|
// tsdbCommit ================
|
||||||
int tsdbBegin(STsdb *pTsdb);
|
|
||||||
|
|
||||||
#if 1
|
#if 1
|
||||||
|
|
||||||
|
@ -60,16 +61,9 @@ struct STable {
|
||||||
#define TABLE_TID(t) (t)->tid
|
#define TABLE_TID(t) (t)->tid
|
||||||
#define TABLE_UID(t) (t)->uid
|
#define TABLE_UID(t) (t)->uid
|
||||||
|
|
||||||
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb);
|
|
||||||
int tsdbClose(STsdb *pTsdb);
|
|
||||||
int tsdbInsertData(STsdb *pTsdb, int64_t version, SSubmitReq *pMsg, SSubmitRsp *pRsp);
|
|
||||||
int tsdbPrepareCommit(STsdb *pTsdb);
|
int tsdbPrepareCommit(STsdb *pTsdb);
|
||||||
int tsdbCommit(STsdb *pTsdb);
|
|
||||||
int32_t tsdbInitSma(STsdb *pTsdb);
|
int32_t tsdbInitSma(STsdb *pTsdb);
|
||||||
int32_t tsdbCreateTSma(STsdb *pTsdb, char *pMsg);
|
|
||||||
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
|
int32_t tsdbDropTSma(STsdb *pTsdb, char *pMsg);
|
||||||
int32_t tsdbUpdateSmaWindow(STsdb *pTsdb, SSubmitReq *pMsg, int64_t version);
|
|
||||||
int32_t tsdbInsertTSmaData(STsdb *pTsdb, int64_t indexUid, const char *msg);
|
|
||||||
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
|
int32_t tsdbDropTSmaData(STsdb *pTsdb, int64_t indexUid);
|
||||||
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
|
int32_t tsdbInsertRSmaData(STsdb *pTsdb, char *msg);
|
||||||
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
|
void tsdbCleanupReadHandle(tsdbReaderT queryHandle);
|
||||||
|
@ -237,12 +231,6 @@ static FORCE_INLINE TSKEY tsdbNextIterKey(SSkipListIterator *pIter) {
|
||||||
return TD_ROW_KEY(row);
|
return TD_ROW_KEY(row);
|
||||||
}
|
}
|
||||||
|
|
||||||
// tsdbOptions
|
|
||||||
extern const STsdbCfg defautlTsdbOptions;
|
|
||||||
|
|
||||||
int tsdbValidateOptions(const STsdbCfg *);
|
|
||||||
void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc);
|
|
||||||
|
|
||||||
// tsdbReadImpl
|
// tsdbReadImpl
|
||||||
typedef struct SReadH SReadH;
|
typedef struct SReadH SReadH;
|
||||||
|
|
||||||
|
|
|
@ -16,9 +16,7 @@
|
||||||
#ifndef _TD_VNODE_TSDB_SMA_H_
|
#ifndef _TD_VNODE_TSDB_SMA_H_
|
||||||
#define _TD_VNODE_TSDB_SMA_H_
|
#define _TD_VNODE_TSDB_SMA_H_
|
||||||
|
|
||||||
#include "os.h"
|
#include "tsdb.h"
|
||||||
#include "thash.h"
|
|
||||||
#include "tmsg.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
|
@ -32,12 +30,6 @@ struct STbDdlH {
|
||||||
__tb_ddl_fn_t fp;
|
__tb_ddl_fn_t fp;
|
||||||
};
|
};
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
tb_uid_t suid;
|
|
||||||
SArray *tbUids;
|
|
||||||
SHashObj *uidHash;
|
|
||||||
} STbUidStore;
|
|
||||||
|
|
||||||
static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
|
static FORCE_INLINE int32_t tsdbUidStoreInit(STbUidStore **pStore) {
|
||||||
ASSERT(*pStore == NULL);
|
ASSERT(*pStore == NULL);
|
||||||
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
|
*pStore = taosMemoryCalloc(1, sizeof(STbUidStore));
|
||||||
|
|
|
@ -16,6 +16,10 @@
|
||||||
#ifndef _TD_VND_H_
|
#ifndef _TD_VND_H_
|
||||||
#define _TD_VND_H_
|
#define _TD_VND_H_
|
||||||
|
|
||||||
|
#include "sync.h"
|
||||||
|
#include "syncTools.h"
|
||||||
|
#include "vnodeInt.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -58,11 +62,9 @@ struct SVBufPool {
|
||||||
SVBufPoolNode node;
|
SVBufPoolNode node;
|
||||||
};
|
};
|
||||||
|
|
||||||
int vnodeOpenBufPool(SVnode* pVnode, int64_t size);
|
int vnodeOpenBufPool(SVnode* pVnode, int64_t size);
|
||||||
int vnodeCloseBufPool(SVnode* pVnode);
|
int vnodeCloseBufPool(SVnode* pVnode);
|
||||||
void vnodeBufPoolReset(SVBufPool* pPool);
|
void vnodeBufPoolReset(SVBufPool* pPool);
|
||||||
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
|
|
||||||
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
|
|
||||||
|
|
||||||
// vnodeQuery ====================
|
// vnodeQuery ====================
|
||||||
int vnodeQueryOpen(SVnode* pVnode);
|
int vnodeQueryOpen(SVnode* pVnode);
|
||||||
|
@ -79,6 +81,20 @@ int vnodeLoadInfo(const char* dir, SVnodeInfo* pInfo);
|
||||||
int vnodeSyncCommit(SVnode* pVnode);
|
int vnodeSyncCommit(SVnode* pVnode);
|
||||||
int vnodeAsyncCommit(SVnode* pVnode);
|
int vnodeAsyncCommit(SVnode* pVnode);
|
||||||
|
|
||||||
|
// vnodeCommit ====================
|
||||||
|
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
|
||||||
|
int32_t vnodeSyncStart(SVnode* pVnode);
|
||||||
|
void vnodeSyncClose(SVnode* pVnode);
|
||||||
|
void vnodeSyncSetQ(SVnode* pVnode, void* qHandle);
|
||||||
|
void vnodeSyncSetRpc(SVnode* pVnode, void* rpcHandle);
|
||||||
|
int32_t vnodeSyncEqMsg(void* qHandle, SRpcMsg* pMsg);
|
||||||
|
int32_t vnodeSendMsg(void* rpcHandle, const SEpSet* pEpSet, SRpcMsg* pMsg);
|
||||||
|
void vnodeSyncCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
|
void vnodeSyncPreCommitCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
|
void vnodeSyncRollBackCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SFsmCbMeta cbMeta);
|
||||||
|
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot);
|
||||||
|
SSyncFSM* syncVnodeMakeFsm();
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -59,6 +59,55 @@ typedef struct SQWorkerMgmt SQHandle;
|
||||||
#define VNODE_TQ_DIR "tq"
|
#define VNODE_TQ_DIR "tq"
|
||||||
#define VNODE_WAL_DIR "wal"
|
#define VNODE_WAL_DIR "wal"
|
||||||
|
|
||||||
|
// vnd.h
|
||||||
|
void* vnodeBufPoolMalloc(SVBufPool* pPool, int size);
|
||||||
|
void vnodeBufPoolFree(SVBufPool* pPool, void* p);
|
||||||
|
|
||||||
|
// meta
|
||||||
|
typedef struct SMCtbCursor SMCtbCursor;
|
||||||
|
typedef struct STbUidStore STbUidStore;
|
||||||
|
|
||||||
|
int metaOpen(SVnode* pVnode, SMeta** ppMeta);
|
||||||
|
int metaClose(SMeta* pMeta);
|
||||||
|
int metaBegin(SMeta* pMeta);
|
||||||
|
int metaCommit(SMeta* pMeta);
|
||||||
|
int metaCreateSTable(SMeta* pMeta, int64_t version, SVCreateStbReq* pReq);
|
||||||
|
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq);
|
||||||
|
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, bool isinline);
|
||||||
|
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver);
|
||||||
|
int metaGetTableEntryByUid(SMetaReader* pReader, tb_uid_t uid);
|
||||||
|
int metaGetTableEntryByName(SMetaReader* pReader, const char* name);
|
||||||
|
int metaGetTbNum(SMeta* pMeta);
|
||||||
|
SMCtbCursor* metaOpenCtbCursor(SMeta* pMeta, tb_uid_t uid);
|
||||||
|
void metaCloseCtbCurosr(SMCtbCursor* pCtbCur);
|
||||||
|
tb_uid_t metaCtbCursorNext(SMCtbCursor* pCtbCur);
|
||||||
|
SArray* metaGetSmaTbUids(SMeta* pMeta, bool isDup);
|
||||||
|
void* metaGetSmaInfoByIndex(SMeta* pMeta, int64_t indexUid, bool isDecode);
|
||||||
|
STSmaWrapper* metaGetSmaInfoByTable(SMeta* pMeta, tb_uid_t uid);
|
||||||
|
int32_t metaCreateTSma(SMeta* pMeta, SSmaCfg* pCfg);
|
||||||
|
int32_t metaDropTSma(SMeta* pMeta, int64_t indexUid);
|
||||||
|
|
||||||
|
// tsdb
|
||||||
|
int tsdbOpen(SVnode* pVnode, STsdb** ppTsdb);
|
||||||
|
int tsdbClose(STsdb* pTsdb);
|
||||||
|
int tsdbBegin(STsdb* pTsdb);
|
||||||
|
int tsdbCommit(STsdb* pTsdb);
|
||||||
|
int32_t tsdbUpdateSmaWindow(STsdb* pTsdb, SSubmitReq* pMsg, int64_t version);
|
||||||
|
int32_t tsdbCreateTSma(STsdb* pTsdb, char* pMsg);
|
||||||
|
int32_t tsdbInsertTSmaData(STsdb* pTsdb, int64_t indexUid, const char* msg);
|
||||||
|
int tsdbInsertData(STsdb* pTsdb, int64_t version, SSubmitReq* pMsg, SSubmitRsp* pRsp);
|
||||||
|
|
||||||
|
// tq
|
||||||
|
STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal);
|
||||||
|
void tqClose(STQ*);
|
||||||
|
int tqPushMsg(STQ*, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver);
|
||||||
|
int tqCommit(STQ*);
|
||||||
|
int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessTaskExec(STQ* pTq, char* msg, int32_t msgLen, int32_t workerId);
|
||||||
|
int32_t tqProcessTaskDeploy(STQ* pTq, char* msg, int32_t msgLen);
|
||||||
|
int32_t tqProcessStreamTrigger(STQ* pTq, void* data, int32_t dataLen, int32_t workerId);
|
||||||
|
int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId);
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int8_t streamType; // sma or other
|
int8_t streamType; // sma or other
|
||||||
int8_t dstType;
|
int8_t dstType;
|
||||||
|
@ -106,6 +155,12 @@ struct SVnode {
|
||||||
SQHandle* pQuery;
|
SQHandle* pQuery;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
struct STbUidStore {
|
||||||
|
tb_uid_t suid;
|
||||||
|
SArray* tbUids;
|
||||||
|
SHashObj* uidHash;
|
||||||
|
};
|
||||||
|
|
||||||
#define TD_VID(PVNODE) (PVNODE)->config.vgId
|
#define TD_VID(PVNODE) (PVNODE)->config.vgId
|
||||||
|
|
||||||
typedef struct STbDdlH STbDdlH;
|
typedef struct STbDdlH STbDdlH;
|
||||||
|
@ -113,18 +168,6 @@ typedef struct STbDdlH STbDdlH;
|
||||||
// sma
|
// sma
|
||||||
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
|
||||||
|
|
||||||
#include "vnd.h"
|
|
||||||
|
|
||||||
#include "meta.h"
|
|
||||||
|
|
||||||
#include "tsdb.h"
|
|
||||||
|
|
||||||
#include "tq.h"
|
|
||||||
|
|
||||||
#include "vnodeSync.h"
|
|
||||||
|
|
||||||
#include "tsdbSma.h"
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -1,44 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#ifndef _TD_VNODE_SYNC_H_
|
|
||||||
#define _TD_VNODE_SYNC_H_
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
extern "C" {
|
|
||||||
#endif
|
|
||||||
|
|
||||||
int32_t vnodeSyncOpen(SVnode *pVnode, char *path);
|
|
||||||
int32_t vnodeSyncStart(SVnode *pVnode);
|
|
||||||
void vnodeSyncClose(SVnode *pVnode);
|
|
||||||
|
|
||||||
void vnodeSyncSetQ(SVnode *pVnode, void *qHandle);
|
|
||||||
void vnodeSyncSetRpc(SVnode *pVnode, void *rpcHandle);
|
|
||||||
|
|
||||||
int32_t vnodeSyncEqMsg(void *qHandle, SRpcMsg *pMsg);
|
|
||||||
int32_t vnodeSendMsg(void *rpcHandle, const SEpSet *pEpSet, SRpcMsg *pMsg);
|
|
||||||
|
|
||||||
void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
|
||||||
void vnodeSyncPreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
|
||||||
void vnodeSyncRollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta);
|
|
||||||
int32_t vnodeSyncGetSnapshotCb(struct SSyncFSM *pFsm, SSnapshot *pSnapshot);
|
|
||||||
|
|
||||||
SSyncFSM *syncVnodeMakeFsm();
|
|
||||||
|
|
||||||
#ifdef __cplusplus
|
|
||||||
}
|
|
||||||
#endif
|
|
||||||
|
|
||||||
#endif /*_TD_VNODE_SYNC_H_*/
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "meta.h"
|
||||||
|
|
||||||
static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBufPoolMalloc((SVBufPool *)pPool, size); }
|
static FORCE_INLINE void *metaMalloc(void *pPool, size_t size) { return vnodeBufPoolMalloc((SVBufPool *)pPool, size); }
|
||||||
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
|
static FORCE_INLINE void metaFree(void *pPool, void *p) { vnodeBufPoolFree((SVBufPool *)pPool, p); }
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "meta.h"
|
||||||
|
|
||||||
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
|
int metaEncodeEntry(SCoder *pCoder, const SMetaEntry *pME) {
|
||||||
if (tStartEncode(pCoder) < 0) return -1;
|
if (tStartEncode(pCoder) < 0) return -1;
|
||||||
|
|
|
@ -16,7 +16,7 @@
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
#include "index.h"
|
#include "index.h"
|
||||||
#endif
|
#endif
|
||||||
#include "vnodeInt.h"
|
#include "meta.h"
|
||||||
|
|
||||||
struct SMetaIdx {
|
struct SMetaIdx {
|
||||||
#ifdef USE_INVERTED_INDEX
|
#ifdef USE_INVERTED_INDEX
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "meta.h"
|
||||||
|
|
||||||
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
|
||||||
|
|
|
@ -13,12 +13,12 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "meta.h"
|
||||||
|
|
||||||
void metaReaderInit(SMetaReader *pReader, SVnode *pVnode, int32_t flags) {
|
void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags) {
|
||||||
memset(pReader, 0, sizeof(*pReader));
|
memset(pReader, 0, sizeof(*pReader));
|
||||||
pReader->flags = flags;
|
pReader->flags = flags;
|
||||||
pReader->pMeta = pVnode->pMeta;
|
pReader->pMeta = pMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
void metaReaderClear(SMetaReader *pReader) {
|
void metaReaderClear(SMetaReader *pReader) {
|
||||||
|
@ -91,7 +91,7 @@ SMTbCursor *metaOpenTbCursor(SMeta *pMeta) {
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
||||||
|
|
||||||
metaReaderInit(&pTbCur->mr, pMeta->pVnode, 0);
|
metaReaderInit(&pTbCur->mr, pMeta, 0);
|
||||||
|
|
||||||
tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc);
|
tdbDbcOpen(pMeta->pUidIdx, &pTbCur->pDbc);
|
||||||
|
|
||||||
|
@ -122,7 +122,7 @@ int metaTbCursorNext(SMTbCursor *pTbCur) {
|
||||||
}
|
}
|
||||||
|
|
||||||
metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
|
metaGetTableEntryByVersion(&pTbCur->mr, *(int64_t *)pTbCur->pVal, *(tb_uid_t *)pTbCur->pKey);
|
||||||
if (pTbCur->mr.me.type == META_SUPER_TABLE) {
|
if (pTbCur->mr.me.type == TSDB_SUPER_TABLE) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -234,7 +234,7 @@ STSchema *metaGetTbTSchema(SMeta *pMeta, tb_uid_t uid, int32_t sver) {
|
||||||
STSchemaBuilder sb = {0};
|
STSchemaBuilder sb = {0};
|
||||||
SSchema *pSchema;
|
SSchema *pSchema;
|
||||||
|
|
||||||
metaReaderInit(&mr, pMeta->pVnode, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
metaGetTableEntryByUid(&mr, uid);
|
metaGetTableEntryByUid(&mr, uid);
|
||||||
|
|
||||||
if (mr.me.type == TSDB_CHILD_TABLE) {
|
if (mr.me.type == TSDB_CHILD_TABLE) {
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "meta.h"
|
||||||
|
|
||||||
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
|
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
|
||||||
|
@ -37,7 +37,7 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
// validate req
|
// validate req
|
||||||
metaReaderInit(&mr, pMeta->pVnode, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
||||||
// TODO: just for pass case
|
// TODO: just for pass case
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -91,7 +91,7 @@ int metaCreateTable(SMeta *pMeta, int64_t version, SVCreateTbReq *pReq) {
|
||||||
pReq->ctime = taosGetTimestampSec();
|
pReq->ctime = taosGetTimestampSec();
|
||||||
|
|
||||||
// validate req
|
// validate req
|
||||||
metaReaderInit(&mr, pMeta->pVnode, 0);
|
metaReaderInit(&mr, pMeta, 0);
|
||||||
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
if (metaGetTableEntryByName(&mr, pReq->name) == 0) {
|
||||||
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
terrno = TSDB_CODE_TDB_TABLE_ALREADY_EXIST;
|
||||||
metaReaderClear(&mr);
|
metaReaderClear(&mr);
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tq.h"
|
||||||
|
|
||||||
int32_t tqInit() {
|
int32_t tqInit() {
|
||||||
//
|
//
|
||||||
|
@ -176,9 +176,9 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
|
||||||
atomic_store_ptr(&pExec->pushHandle.handle, NULL);
|
atomic_store_ptr(&pExec->pushHandle.handle, NULL);
|
||||||
taosWUnLockLatch(&pExec->pushHandle.lock);
|
taosWUnLockLatch(&pExec->pushHandle.lock);
|
||||||
|
|
||||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum,
|
TD_VID(pTq->pVnode), fetchOffset, pExec->pushHandle.consumerId, pExec->pushHandle.epoch, rsp.blockNum,
|
||||||
rsp.reqOffset, rsp.rspOffset);
|
rsp.reqOffset, rsp.rspOffset);
|
||||||
|
|
||||||
// TODO destroy
|
// TODO destroy
|
||||||
taosArrayDestroy(rsp.blockData);
|
taosArrayDestroy(rsp.blockData);
|
||||||
|
@ -390,8 +390,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
fetchOffset = pReq->currentOffset + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||||
|
|
||||||
STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
|
STqExec* pExec = taosHashGet(pTq->execs, pReq->subKey, strlen(pReq->subKey));
|
||||||
ASSERT(pExec);
|
ASSERT(pExec);
|
||||||
|
@ -418,16 +418,16 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
while (1) {
|
while (1) {
|
||||||
consumerEpoch = atomic_load_32(&pExec->epoch);
|
consumerEpoch = atomic_load_32(&pExec->epoch);
|
||||||
if (consumerEpoch > reqEpoch) {
|
if (consumerEpoch > reqEpoch) {
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
||||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
taosThreadMutexLock(&pExec->pWalReader->mutex);
|
taosThreadMutexLock(&pExec->pWalReader->mutex);
|
||||||
|
|
||||||
if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
|
if (walFetchHead(pExec->pWalReader, fetchOffset, pHeadWithCkSum) < 0) {
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), fetchOffset);
|
TD_VID(pTq->pVnode), fetchOffset);
|
||||||
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
|
taosThreadMutexUnlock(&pExec->pWalReader->mutex);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -448,7 +448,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO: no more log, set timer to wait blocking time
|
// TODO: no more log, set timer to wait blocking time
|
||||||
// if data inserted during waiting, launch query and
|
// if data inserted during waiting, launch query and
|
||||||
// response to user
|
// response to user
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), fetchOffset);
|
TD_VID(pTq->pVnode), fetchOffset);
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
|
@ -476,8 +476,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
||||||
|
|
||||||
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
if (pHead->msgType == TDMT_VND_SUBMIT) {
|
||||||
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
SSubmitReq* pCont = (SSubmitReq*)&pHead->body;
|
||||||
|
@ -591,8 +591,8 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
pMsg->code = 0;
|
pMsg->code = 0;
|
||||||
tmsgSendRsp(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
|
|
||||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) send rsp, block num: %d, reqOffset: %ld, rspOffset: %ld",
|
||||||
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
|
TD_VID(pTq->pVnode), fetchOffset, consumerId, pReq->epoch, rsp.blockNum, rsp.reqOffset, rsp.rspOffset);
|
||||||
|
|
||||||
// TODO destroy
|
// TODO destroy
|
||||||
taosArrayDestroy(rsp.blockData);
|
taosArrayDestroy(rsp.blockData);
|
||||||
|
@ -618,7 +618,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
fetchOffset = pReq->currentOffset + 1;
|
fetchOffset = pReq->currentOffset + 1;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) recv poll req in vg %d, req %ld %ld", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
TD_VID(pTq->pVnode), pReq->currentOffset, fetchOffset);
|
||||||
|
|
||||||
SMqPollRspV2 rspV2 = {0};
|
SMqPollRspV2 rspV2 = {0};
|
||||||
|
@ -660,7 +660,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
vDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
|
tqDebug("poll topic %s from consumer %ld (epoch %d) vg %d", pTopic->topicName, consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode));
|
TD_VID(pTq->pVnode));
|
||||||
|
|
||||||
rspV2.reqOffset = pReq->currentOffset;
|
rspV2.reqOffset = pReq->currentOffset;
|
||||||
|
@ -671,7 +671,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO
|
// TODO
|
||||||
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
consumerEpoch = atomic_load_32(&pConsumer->epoch);
|
||||||
if (consumerEpoch > reqEpoch) {
|
if (consumerEpoch > reqEpoch) {
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, found new consumer epoch %d discard req epoch %d",
|
||||||
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
consumerId, pReq->epoch, TD_VID(pTq->pVnode), fetchOffset, consumerEpoch, reqEpoch);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -680,11 +680,11 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
// TODO: no more log, set timer to wait blocking time
|
// TODO: no more log, set timer to wait blocking time
|
||||||
// if data inserted during waiting, launch query and
|
// if data inserted during waiting, launch query and
|
||||||
// response to user
|
// response to user
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), fetchOffset);
|
TD_VID(pTq->pVnode), fetchOffset);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d", consumerId, pReq->epoch,
|
||||||
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
TD_VID(pTq->pVnode), fetchOffset, pHead->msgType);
|
||||||
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
|
||||||
/*pHead = pTopic->pReadhandle->pHead;*/
|
/*pHead = pTopic->pReadhandle->pHead;*/
|
||||||
|
@ -709,7 +709,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (taosArrayGetSize(pRes) == 0) {
|
if (taosArrayGetSize(pRes) == 0) {
|
||||||
vDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
|
tqDebug("tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted", consumerId,
|
||||||
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
|
pReq->epoch, TD_VID(pTq->pVnode), fetchOffset);
|
||||||
fetchOffset++;
|
fetchOffset++;
|
||||||
rspV2.skipLogNum++;
|
rspV2.skipLogNum++;
|
||||||
|
@ -770,7 +770,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
pMsg->pCont = buf;
|
pMsg->pCont = buf;
|
||||||
pMsg->contLen = msgLen;
|
pMsg->contLen = msgLen;
|
||||||
pMsg->code = 0;
|
pMsg->code = 0;
|
||||||
vDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
|
tqDebug("vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp", TD_VID(pTq->pVnode), fetchOffset,
|
||||||
pHead->msgType, consumerId, pReq->epoch);
|
pHead->msgType, consumerId, pReq->epoch);
|
||||||
tmsgSendRsp(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
taosMemoryFree(pHead);
|
taosMemoryFree(pHead);
|
||||||
|
@ -804,7 +804,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
|
||||||
pMsg->contLen = tlen;
|
pMsg->contLen = tlen;
|
||||||
pMsg->code = 0;
|
pMsg->code = 0;
|
||||||
tmsgSendRsp(pMsg);
|
tmsgSendRsp(pMsg);
|
||||||
vDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
|
tqDebug("vg %d offset %ld from consumer %ld (epoch %d) not rsp", TD_VID(pTq->pVnode), fetchOffset, consumerId,
|
||||||
pReq->epoch);
|
pReq->epoch);
|
||||||
/*}*/
|
/*}*/
|
||||||
|
|
||||||
|
|
|
@ -13,4 +13,4 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tq.h"
|
||||||
|
|
|
@ -12,7 +12,7 @@
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
* You should have received a copy of the GNU Affero General Public License
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#include "vnodeInt.h"
|
#include "tq.h"
|
||||||
// #include <fcntl.h>
|
// #include <fcntl.h>
|
||||||
// #include <string.h>
|
// #include <string.h>
|
||||||
// #include <unistd.h>
|
// #include <unistd.h>
|
||||||
|
@ -86,7 +86,7 @@ STqMetaStore* tqStoreOpen(STQ* pTq, const char* path, FTqSerialize serializer, F
|
||||||
}
|
}
|
||||||
strcpy(pMeta->dirPath, path);
|
strcpy(pMeta->dirPath, path);
|
||||||
|
|
||||||
char *name = taosMemoryMalloc(pathLen + 10) ;
|
char* name = taosMemoryMalloc(pathLen + 10);
|
||||||
|
|
||||||
strcpy(name, path);
|
strcpy(name, path);
|
||||||
if (!taosDirExist(name) && taosMkDir(name) != 0) {
|
if (!taosDirExist(name) && taosMkDir(name) != 0) {
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tq.h"
|
||||||
|
|
||||||
enum ETqOffsetPersist {
|
enum ETqOffsetPersist {
|
||||||
TQ_OFFSET_PERSIST__LAZY = 1,
|
TQ_OFFSET_PERSIST__LAZY = 1,
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tq.h"
|
||||||
|
|
||||||
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
STqReadHandle* tqInitSubmitMsgScanner(SMeta* pMeta) {
|
||||||
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
|
STqReadHandle* pReadHandle = taosMemoryMalloc(sizeof(STqReadHandle));
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
#define TSDB_MAX_SUBBLOCKS 8
|
#define TSDB_MAX_SUBBLOCKS 8
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
int tsdbBegin(STsdb *pTsdb) {
|
int tsdbBegin(STsdb *pTsdb) {
|
||||||
STsdbMemTable *pMem;
|
STsdbMemTable *pMem;
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
#if 0
|
#if 0
|
||||||
#include "tsdbint.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
STable * pTable;
|
STable * pTable;
|
||||||
|
@ -33,15 +33,15 @@ typedef struct {
|
||||||
SDataCols *pDataCols;
|
SDataCols *pDataCols;
|
||||||
} SCompactH;
|
} SCompactH;
|
||||||
|
|
||||||
#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet))
|
#define TSDB_COMPACT_WSET(pComph) (&((pComph)->wSet))
|
||||||
#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh))
|
#define TSDB_COMPACT_REPO(pComph) TSDB_READ_REPO(&((pComph)->readh))
|
||||||
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
|
#define TSDB_COMPACT_HEAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_HEAD)
|
||||||
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
|
#define TSDB_COMPACT_DATA_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_DATA)
|
||||||
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
|
#define TSDB_COMPACT_LAST_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_LAST)
|
||||||
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
|
#define TSDB_COMPACT_SMAD_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAD)
|
||||||
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
|
#define TSDB_COMPACT_SMAL_FILE(pComph) TSDB_DFILE_IN_SET(TSDB_COMPACT_WSET(pComph), TSDB_FILE_SMAL)
|
||||||
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
|
#define TSDB_COMPACT_BUF(pComph) TSDB_READ_BUF(&((pComph)->readh))
|
||||||
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
|
#define TSDB_COMPACT_COMP_BUF(pComph) TSDB_READ_COMP_BUF(&((pComph)->readh))
|
||||||
|
|
||||||
static int tsdbAsyncCompact(STsdbRepo *pRepo);
|
static int tsdbAsyncCompact(STsdbRepo *pRepo);
|
||||||
static void tsdbStartCompact(STsdbRepo *pRepo);
|
static void tsdbStartCompact(STsdbRepo *pRepo);
|
||||||
|
@ -531,4 +531,3 @@ static int tsdbCompactMeta(STsdbRepo *pRepo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
|
typedef enum { TSDB_TXN_TEMP_FILE = 0, TSDB_TXN_CURR_FILE } TSDB_TXN_FILE_T;
|
||||||
static const char *tsdbTxnFname[] = {"current.t", "current"};
|
static const char *tsdbTxnFname[] = {"current.t", "current"};
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
static const char *TSDB_FNAME_SUFFIX[] = {
|
static const char *TSDB_FNAME_SUFFIX[] = {
|
||||||
"head", // TSDB_FILE_HEAD
|
"head", // TSDB_FILE_HEAD
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
|
int tsdbOpen(SVnode *pVnode, STsdb **ppTsdb) {
|
||||||
STsdb *pTsdb = NULL;
|
STsdb *pTsdb = NULL;
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
static STbData *tsdbNewTbData(tb_uid_t uid);
|
static STbData *tsdbNewTbData(tb_uid_t uid);
|
||||||
static void tsdbFreeTbData(STbData *pTbData);
|
static void tsdbFreeTbData(STbData *pTbData);
|
||||||
|
|
|
@ -1,34 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
|
||||||
*
|
|
||||||
* This program is free software: you can use, redistribute, and/or modify
|
|
||||||
* it under the terms of the GNU Affero General Public License, version 3
|
|
||||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
|
||||||
*
|
|
||||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
|
||||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
|
||||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
|
||||||
*
|
|
||||||
* You should have received a copy of the GNU Affero General Public License
|
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
||||||
*/
|
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
|
||||||
|
|
||||||
const STsdbCfg defautlTsdbOptions = {.precision = 0,
|
|
||||||
.lruCacheSize = 0,
|
|
||||||
.days = 10,
|
|
||||||
.minRows = 100,
|
|
||||||
.maxRows = 4096,
|
|
||||||
.keep2 = 3650,
|
|
||||||
.keep0 = 3650,
|
|
||||||
.keep1 = 3650,
|
|
||||||
.update = 0,
|
|
||||||
.compression = TWO_STAGE_COMP};
|
|
||||||
|
|
||||||
int tsdbValidateOptions(const STsdbCfg *pTsdbOptions) {
|
|
||||||
// TODO
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
void tsdbOptionsCopy(STsdbCfg *pDest, const STsdbCfg *pSrc) { memcpy(pDest, pSrc, sizeof(STsdbCfg)); }
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
#define EXTRA_BYTES 2
|
#define EXTRA_BYTES 2
|
||||||
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
|
||||||
|
@ -3618,7 +3618,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
|
||||||
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
|
SColIndex* pColIndex, int32_t numOfCols, uint64_t reqId, uint64_t taskId) {
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
metaReaderInit(&mr, ((SMeta*)pMeta)->pVnode, 0);
|
metaReaderInit(&mr, (SMeta*)pMeta, 0);
|
||||||
|
|
||||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
||||||
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
tsdbError("%p failed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
||||||
|
@ -3628,7 +3628,7 @@ int32_t tsdbQuerySTableByTagCond(void* pMeta, uint64_t uid, TSKEY skey, const ch
|
||||||
tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
tsdbDebug("%p succeed to get stable, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId, reqId);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (mr.me.type != META_SUPER_TABLE) {
|
if (mr.me.type != TSDB_SUPER_TABLE) {
|
||||||
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
|
tsdbError("%p query normal tag not allowed, uid:%" PRIu64 ", TID:0x%" PRIx64 " QID:0x%" PRIx64, pMeta, uid, taskId,
|
||||||
reqId);
|
reqId);
|
||||||
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
|
terrno = TSDB_CODE_OPS_NOT_SUPPORT; // basically, this error is caused by invalid sql issued by client
|
||||||
|
@ -3687,10 +3687,9 @@ int32_t tsdbQueryTableList(void* pMeta, SArray* pRes, void* filterInfo) {
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
|
int32_t tsdbGetOneTableGroup(void* pMeta, uint64_t uid, TSKEY startKey, STableGroupInfo* pGroupInfo) {
|
||||||
SMeta* metaP = (SMeta*)pMeta;
|
|
||||||
SMetaReader mr = {0};
|
SMetaReader mr = {0};
|
||||||
|
|
||||||
metaReaderInit(&mr, metaP->pVnode, 0);
|
metaReaderInit(&mr, (SMeta*)pMeta, 0);
|
||||||
|
|
||||||
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
if (metaGetTableEntryByUid(&mr, uid) < 0) {
|
||||||
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
terrno = TSDB_CODE_TDB_INVALID_TABLE_ID;
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
#define TSDB_KEY_COL_OFFSET 0
|
#define TSDB_KEY_COL_OFFSET 0
|
||||||
|
|
||||||
|
|
|
@ -13,9 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
|
||||||
#if 0
|
#if 0
|
||||||
#include "tsdbint.h"
|
#include "tsdb.h"
|
||||||
#ifndef _TSDB_PLUGINS
|
#ifndef _TSDB_PLUGINS
|
||||||
|
|
||||||
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
|
int tsdbScanFGroup(STsdbScanHandle* pScanHandle, char* rootDir, int fid) { return 0; }
|
||||||
|
|
|
@ -13,7 +13,8 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdbSma.h"
|
||||||
|
#include "tsdb.h"
|
||||||
|
|
||||||
static const char *TSDB_SMA_DNAME[] = {
|
static const char *TSDB_SMA_DNAME[] = {
|
||||||
"", // TSDB_SMA_TYPE_BLOCK
|
"", // TSDB_SMA_TYPE_BLOCK
|
||||||
|
@ -678,7 +679,6 @@ int32_t tsdbUpdateExpiredWindowImpl(STsdb *pTsdb, SSubmitReq *pMsg, int64_t vers
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
|
if (tsdbCheckAndInitSmaEnv(pTsdb, TSDB_SMA_TYPE_TIME_RANGE) != TSDB_CODE_SUCCESS) {
|
||||||
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
terrno = TSDB_CODE_TDB_INIT_FAILED;
|
||||||
return TSDB_CODE_FAILED;
|
return TSDB_CODE_FAILED;
|
||||||
|
|
|
@ -15,14 +15,14 @@
|
||||||
|
|
||||||
#define ALLOW_FORBID_FUNC
|
#define ALLOW_FORBID_FUNC
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path) {
|
int32_t tsdbOpenDBEnv(TENV **ppEnv, const char *path) {
|
||||||
int ret = 0;
|
int ret = 0;
|
||||||
|
|
||||||
if (path == NULL) return -1;
|
if (path == NULL) return -1;
|
||||||
|
|
||||||
ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param
|
ret = tdbEnvOpen(path, 4096, 256, ppEnv); // use as param
|
||||||
|
|
||||||
if (ret != 0) {
|
if (ret != 0) {
|
||||||
tsdbError("Failed to create tsdb db env, ret = %d", ret);
|
tsdbError("Failed to create tsdb db env, ret = %d", ret);
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "tsdb.h"
|
||||||
|
|
||||||
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
static int tsdbScanAndConvertSubmitMsg(STsdb *pTsdb, SSubmitReq *pMsg);
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
/* ------------------------ STRUCTURES ------------------------ */
|
/* ------------------------ STRUCTURES ------------------------ */
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
const SVnodeCfg vnodeCfgDefault = {
|
const SVnodeCfg vnodeCfgDefault = {
|
||||||
.vgId = -1,
|
.vgId = -1,
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
#define VND_INFO_FNAME "vnode.json"
|
#define VND_INFO_FNAME "vnode.json"
|
||||||
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
|
#define VND_INFO_FNAME_TMP "vnode_tmp.json"
|
||||||
|
@ -175,7 +175,7 @@ int vnodeAsyncCommit(SVnode *pVnode) {
|
||||||
vnodeWaitCommit(pVnode);
|
vnodeWaitCommit(pVnode);
|
||||||
|
|
||||||
// vnodeBufPoolSwitch(pVnode);
|
// vnodeBufPoolSwitch(pVnode);
|
||||||
tsdbPrepareCommit(pVnode->pTsdb);
|
// tsdbPrepareCommit(pVnode->pTsdb);
|
||||||
|
|
||||||
vnodeScheduleTask(vnodeCommitImpl, pVnode);
|
vnodeScheduleTask(vnodeCommitImpl, pVnode);
|
||||||
|
|
||||||
|
|
|
@ -14,7 +14,7 @@
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#define _DEFAULT_SOURCE
|
#define _DEFAULT_SOURCE
|
||||||
#include "vnodeInt.h"
|
#include "vnd.h"
|
||||||
// #include "vnodeInt.h"
|
// #include "vnodeInt.h"
|
||||||
|
|
||||||
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
|
int32_t vnodeAlter(SVnode *pVnode, const SVnodeCfg *pCfg) { return 0; }
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
typedef struct SVnodeTask SVnodeTask;
|
typedef struct SVnodeTask SVnodeTask;
|
||||||
struct SVnodeTask {
|
struct SVnodeTask {
|
||||||
|
|
|
@ -13,8 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnd.h"
|
||||||
#include "vnodeSync.h"
|
|
||||||
|
|
||||||
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
int vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs) {
|
||||||
SVnodeInfo info = {0};
|
SVnodeInfo info = {0};
|
||||||
|
|
|
@ -13,7 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "vnodeInt.h"
|
#include "vnd.h"
|
||||||
|
|
||||||
int vnodeQueryOpen(SVnode *pVnode) {
|
int vnodeQueryOpen(SVnode *pVnode) {
|
||||||
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
|
return qWorkerInit(NODE_TYPE_VNODE, TD_VID(pVnode), NULL, (void **)&pVnode->pQuery, &pVnode->msgCb);
|
||||||
|
@ -51,7 +51,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// query meta
|
// query meta
|
||||||
metaReaderInit(&mer1, pVnode, 0);
|
metaReaderInit(&mer1, pVnode->pMeta, 0);
|
||||||
|
|
||||||
if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
|
if (metaGetTableEntryByName(&mer1, infoReq.tbName) < 0) {
|
||||||
goto _exit;
|
goto _exit;
|
||||||
|
@ -67,7 +67,7 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
|
||||||
schemaTag = mer1.me.stbEntry.schemaTag;
|
schemaTag = mer1.me.stbEntry.schemaTag;
|
||||||
metaRsp.suid = mer1.me.uid;
|
metaRsp.suid = mer1.me.uid;
|
||||||
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
} else if (mer1.me.type == TSDB_CHILD_TABLE) {
|
||||||
metaReaderInit(&mer2, pVnode, 0);
|
metaReaderInit(&mer2, pVnode->pMeta, 0);
|
||||||
if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
if (metaGetTableEntryByUid(&mer2, mer1.me.ctbEntry.suid) < 0) goto _exit;
|
||||||
|
|
||||||
strcpy(metaRsp.stbName, mer2.me.name);
|
strcpy(metaRsp.stbName, mer2.me.name);
|
||||||
|
|
|
@ -13,9 +13,7 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sync.h"
|
#include "vnd.h"
|
||||||
#include "syncTools.h"
|
|
||||||
#include "vnodeInt.h"
|
|
||||||
|
|
||||||
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
static int vnodeProcessCreateStbReq(SVnode *pVnode, int64_t version, void *pReq, int len, SRpcMsg *pRsp);
|
||||||
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
|
static int vnodeProcessAlterStbReq(SVnode *pVnode, void *pReq, int32_t len, SRpcMsg *pRsp);
|
||||||
|
@ -201,9 +199,7 @@ void smaHandleRes(void *pVnode, int64_t smaId, const SArray *data) {
|
||||||
|
|
||||||
// sync integration
|
// sync integration
|
||||||
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
int vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||||
|
|
||||||
if (syncEnvIsStart()) {
|
if (syncEnvIsStart()) {
|
||||||
|
|
||||||
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
SSyncNode *pSyncNode = syncNodeAcquire(pVnode->sync);
|
||||||
assert(pSyncNode != NULL);
|
assert(pSyncNode != NULL);
|
||||||
|
|
||||||
|
|
|
@ -13,10 +13,11 @@
|
||||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
#include "sync.h"
|
#include "vnd.h"
|
||||||
#include "syncTools.h"
|
// #include "sync.h"
|
||||||
#include "tmsgcb.h"
|
// #include "syncTools.h"
|
||||||
#include "vnodeInt.h"
|
// #include "tmsgcb.h"
|
||||||
|
// #include "vnodeInt.h"
|
||||||
|
|
||||||
// sync integration
|
// sync integration
|
||||||
|
|
||||||
|
@ -113,7 +114,7 @@ void vnodeSyncCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cb
|
||||||
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), beginIndex);
|
||||||
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
|
||||||
|
|
||||||
SVnode * pVnode = (SVnode *)(pFsm->data);
|
SVnode *pVnode = (SVnode *)(pFsm->data);
|
||||||
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
|
SyncApplyMsg *pSyncApplyMsg = syncApplyMsgBuild2(pMsg, pVnode->config.vgId, &cbMeta);
|
||||||
SRpcMsg applyMsg;
|
SRpcMsg applyMsg;
|
||||||
syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
|
syncApplyMsg2RpcMsg(pSyncApplyMsg, &applyMsg);
|
||||||
|
|
Loading…
Reference in New Issue