Merge pull request #9240 from taosdata/feature/vnode
Merge branch '3.0' into feature/vnode
This commit is contained in:
commit
32763aa964
|
@ -19,5 +19,5 @@ target_link_libraries(
|
|||
|
||||
# test
|
||||
if(${BUILD_TEST})
|
||||
#add_subdirectory(test)
|
||||
add_subdirectory(test)
|
||||
endif(${BUILD_TEST})
|
|
@ -71,6 +71,7 @@ struct SVnode {
|
|||
SWal* pWal;
|
||||
SVnodeSync* pSync;
|
||||
SVnodeFS* pFs;
|
||||
tsem_t canCommit;
|
||||
};
|
||||
|
||||
int vnodeScheduleTask(SVnodeTask* task);
|
||||
|
|
|
@ -38,9 +38,10 @@ int vnodeCommit(void *arg) {
|
|||
|
||||
metaCommit(pVnode->pMeta);
|
||||
tqCommit(pVnode->pTq);
|
||||
tsdbCommit(pVnode->pTq);
|
||||
tsdbCommit(pVnode->pTsdb);
|
||||
|
||||
vnodeBufPoolRecycle(pVnode);
|
||||
tsem_post(&(pVnode->canCommit));
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
|
|
@ -74,11 +74,14 @@ static SVnode *vnodeNew(const char *path, const SVnodeCfg *pVnodeCfg) {
|
|||
pVnode->path = strdup(path);
|
||||
vnodeOptionsCopy(&(pVnode->config), pVnodeCfg);
|
||||
|
||||
tsem_init(&(pVnode->canCommit), 0, 1);
|
||||
|
||||
return pVnode;
|
||||
}
|
||||
|
||||
static void vnodeFree(SVnode *pVnode) {
|
||||
if (pVnode) {
|
||||
tsem_destroy(&(pVnode->canCommit));
|
||||
tfree(pVnode->path);
|
||||
free(pVnode);
|
||||
}
|
||||
|
|
|
@ -45,13 +45,13 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
|||
|
||||
walFsync(pVnode->pWal, false);
|
||||
|
||||
// Apply each request now
|
||||
for (int i = 0; i < taosArrayGetSize(pMsgs); i++) {
|
||||
pMsg = *(SRpcMsg **)taosArrayGet(pMsgs, i);
|
||||
SVnodeReq vReq;
|
||||
// TODO: Integrate RAFT module here
|
||||
|
||||
// Apply the request
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
SVnodeReq vReq;
|
||||
void * ptr = vnodeMalloc(pVnode, pMsg->contLen);
|
||||
if (ptr == NULL) {
|
||||
// TODO: handle error
|
||||
|
@ -92,21 +92,14 @@ int vnodeProcessWMsgs(SVnode *pVnode, SArray *pMsgs) {
|
|||
}
|
||||
|
||||
pVnode->state.applied = ver;
|
||||
}
|
||||
|
||||
// Check if it needs to commit
|
||||
if (vnodeShouldCommit(pVnode)) {
|
||||
tsem_wait(&(pVnode->canCommit));
|
||||
if (vnodeAsyncCommit(pVnode) < 0) {
|
||||
// TODO: handle error
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int vnodeApplyWMsg(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
|
||||
// TODO
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
|
@ -166,6 +166,21 @@ static void vtClearMsgBatch(SArray *pMsgArr) {
|
|||
taosArrayClear(pMsgArr);
|
||||
}
|
||||
|
||||
static void vtProcessAndApplyReqs(SVnode *pVnode, SArray *pMsgArr) {
|
||||
int rcode;
|
||||
SRpcMsg *pReq;
|
||||
SRpcMsg *pRsp;
|
||||
|
||||
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
|
||||
GTEST_ASSERT_EQ(rcode, 0);
|
||||
|
||||
for (size_t i = 0; i < taosArrayGetSize(pMsgArr); i++) {
|
||||
pReq = *(SRpcMsg **)taosArrayGet(pMsgArr, i);
|
||||
rcode = vnodeApplyWMsg(pVnode, pReq, NULL);
|
||||
GTEST_ASSERT_EQ(rcode, 0);
|
||||
}
|
||||
}
|
||||
|
||||
TEST(vnodeApiTest, vnode_simple_create_table_test) {
|
||||
tb_uid_t suid = 1638166374163;
|
||||
SRpcMsg *pMsg;
|
||||
|
@ -189,8 +204,7 @@ TEST(vnodeApiTest, vnode_simple_create_table_test) {
|
|||
sprintf(tbname, "st");
|
||||
vtBuildCreateStbReq(suid, tbname, &pMsg);
|
||||
taosArrayPush(pMsgArr, &pMsg);
|
||||
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
|
||||
ASSERT_EQ(rcode, 0);
|
||||
vtProcessAndApplyReqs(pVnode, pMsgArr);
|
||||
vtClearMsgBatch(pMsgArr);
|
||||
|
||||
// CREATE A LOT OF CHILD TABLES
|
||||
|
@ -203,8 +217,7 @@ TEST(vnodeApiTest, vnode_simple_create_table_test) {
|
|||
}
|
||||
|
||||
// Process request batch
|
||||
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
|
||||
ASSERT_EQ(rcode, 0);
|
||||
vtProcessAndApplyReqs(pVnode, pMsgArr);
|
||||
|
||||
// Clear request batch
|
||||
vtClearMsgBatch(pMsgArr);
|
||||
|
@ -242,16 +255,14 @@ TEST(vnodeApiTest, vnode_simple_insert_test) {
|
|||
sprintf(tbname, "st");
|
||||
vtBuildCreateStbReq(suid, tbname, &pMsg);
|
||||
taosArrayPush(pMsgArr, &pMsg);
|
||||
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
|
||||
GTEST_ASSERT_EQ(rcode, 0);
|
||||
vtProcessAndApplyReqs(pVnode, pMsgArr);
|
||||
vtClearMsgBatch(pMsgArr);
|
||||
|
||||
// 2. CREATE A CHILD TABLE
|
||||
sprintf(tbname, "t0");
|
||||
vtBuildCreateCtbReq(suid, tbname, &pMsg);
|
||||
taosArrayPush(pMsgArr, &pMsg);
|
||||
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
|
||||
GTEST_ASSERT_EQ(rcode, 0);
|
||||
vtProcessAndApplyReqs(pVnode, pMsgArr);
|
||||
vtClearMsgBatch(pMsgArr);
|
||||
|
||||
// 3. WRITE A LOT OF TIME-SERIES DATA
|
||||
|
@ -260,8 +271,7 @@ TEST(vnodeApiTest, vnode_simple_insert_test) {
|
|||
vtBuildSubmitReq(&pMsg);
|
||||
taosArrayPush(pMsgArr, &pMsg);
|
||||
}
|
||||
rcode = vnodeProcessWMsgs(pVnode, pMsgArr);
|
||||
GTEST_ASSERT_EQ(rcode, 0);
|
||||
vtProcessAndApplyReqs(pVnode, pMsgArr);
|
||||
vtClearMsgBatch(pMsgArr);
|
||||
}
|
||||
|
||||
|
|
|
@ -44,11 +44,13 @@ STQ* tqOpen(const char* path, STqCfg* tqConfig, STqLogReader* tqLogReader, SMemA
|
|||
pTq->path = strdup(path);
|
||||
pTq->tqConfig = tqConfig;
|
||||
pTq->tqLogReader = tqLogReader;
|
||||
#if 0
|
||||
pTq->tqMemRef.pAlloctorFactory = allocFac;
|
||||
pTq->tqMemRef.pAllocator = allocFac->create(allocFac);
|
||||
if (pTq->tqMemRef.pAllocator == NULL) {
|
||||
// TODO: error code of buffer pool
|
||||
}
|
||||
#endif
|
||||
pTq->tqMeta = tqStoreOpen(path, (FTqSerialize)tqSerializeGroup, (FTqDeserialize)tqDeserializeGroup, free, 0);
|
||||
if (pTq->tqMeta == NULL) {
|
||||
// TODO: free STQ
|
||||
|
|
|
@ -1,10 +1,24 @@
|
|||
aux_source_directory(src TSDB_SRC)
|
||||
if(0)
|
||||
add_library(tsdb ${TSDB_SRC})
|
||||
else(0)
|
||||
add_library(tsdb "")
|
||||
target_sources(tsdb
|
||||
PRIVATE
|
||||
"src/tsdbCommit.c"
|
||||
"src/tsdbMain.c"
|
||||
"src/tsdbMemTable.c"
|
||||
"src/tsdbOptions.c"
|
||||
"src/tsdbWrite.c"
|
||||
)
|
||||
endif(0)
|
||||
|
||||
target_include_directories(
|
||||
tsdb
|
||||
PUBLIC "${CMAKE_SOURCE_DIR}/include/dnode/vnode/tsdb"
|
||||
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
|
||||
)
|
||||
|
||||
target_link_libraries(
|
||||
tsdb
|
||||
PUBLIC os
|
||||
|
|
|
@ -16,6 +16,7 @@
|
|||
#ifndef _TD_TSDB_COMMIT_H_
|
||||
#define _TD_TSDB_COMMIT_H_
|
||||
|
||||
#if 0
|
||||
typedef struct {
|
||||
int minFid;
|
||||
int midFid;
|
||||
|
@ -53,5 +54,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
|||
return -1;
|
||||
}
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /* _TD_TSDB_COMMIT_H_ */
|
|
@ -19,8 +19,12 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#if 0
|
||||
|
||||
void *tsdbCompactImpl(STsdbRepo *pRepo);
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
|
@ -16,6 +16,8 @@
|
|||
#ifndef _TD_TSDB_FS_H_
|
||||
#define _TD_TSDB_FS_H_
|
||||
|
||||
#if 0
|
||||
|
||||
#define TSDB_FS_VERSION 0
|
||||
|
||||
// ================== TSDB global config
|
||||
|
@ -113,4 +115,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#endif /* _TD_TSDB_FS_H_ */
|
|
@ -16,6 +16,8 @@
|
|||
#ifndef _TS_TSDB_FILE_H_
|
||||
#define _TS_TSDB_FILE_H_
|
||||
|
||||
#if 0
|
||||
|
||||
#define TSDB_FILE_HEAD_SIZE 512
|
||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
||||
|
@ -364,4 +366,5 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
|
|||
return true;
|
||||
}
|
||||
|
||||
#endif
|
||||
#endif /* _TS_TSDB_FILE_H_ */
|
|
@ -16,10 +16,14 @@
|
|||
#ifndef _TD_TSDB_HEALTH_H_
|
||||
#define _TD_TSDB_HEALTH_H_
|
||||
|
||||
#if 0
|
||||
|
||||
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
|
||||
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
|
||||
|
||||
bool tsdbIdleMemEnough();
|
||||
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
|
||||
|
||||
#endif
|
||||
|
||||
#endif /* _TD_TSDB_BUFFER_H_ */
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TSDB_IDX_H_
|
||||
#define _TD_TSDB_IDX_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_TSDB_IDX_H_*/
|
|
@ -15,6 +15,7 @@
|
|||
|
||||
#ifndef _TD_TSDB_READ_IMPL_H_
|
||||
#define _TD_TSDB_READ_IMPL_H_
|
||||
#if 0
|
||||
|
||||
#include "tfs.h"
|
||||
#include "tsdb.h"
|
||||
|
@ -150,4 +151,6 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
#endif
|
||||
|
||||
#endif /*_TD_TSDB_READ_IMPL_H_*/
|
|
@ -16,6 +16,8 @@
|
|||
#ifndef TSDB_ROW_MERGE_BUF_H
|
||||
#define TSDB_ROW_MERGE_BUF_H
|
||||
|
||||
#if 0
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
@ -42,4 +44,6 @@ static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
|
||||
#endif /* ifndef TSDB_ROW_MERGE_BUF_H */
|
|
@ -1,27 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TSDB_SMA_H_
|
||||
#define _TD_TSDB_SMA_H_
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
||||
#endif /*_TD_TSDB_SMA_H_*/
|
|
@ -16,6 +16,7 @@
|
|||
#ifndef _TD_TSDB_INT_H_
|
||||
#define _TD_TSDB_INT_H_
|
||||
|
||||
#if 0
|
||||
// // TODO: remove the include
|
||||
// #include <errno.h>
|
||||
// #include <fcntl.h>
|
||||
|
@ -144,4 +145,5 @@ static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
|
|||
}
|
||||
#endif
|
||||
|
||||
#endif
|
||||
#endif /* _TD_TSDB_INT_H_ */
|
File diff suppressed because it is too large
Load Diff
|
@ -1,14 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
File diff suppressed because it is too large
Load Diff
|
@ -1,14 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
|
@ -1,16 +0,0 @@
|
|||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
|
||||
PROJECT(TDengine)
|
||||
|
||||
INCLUDE_DIRECTORIES(inc)
|
||||
AUX_SOURCE_DIRECTORY(src SRC)
|
||||
ADD_LIBRARY(tsdb ${SRC})
|
||||
TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
|
||||
|
||||
IF (TD_TSDB_PLUGINS)
|
||||
TARGET_LINK_LIBRARIES(tsdb tsdbPlugins)
|
||||
ENDIF ()
|
||||
|
||||
IF (TD_LINUX)
|
||||
# Someone has no gtest directory, so comment it
|
||||
# ADD_SUBDIRECTORY(tests)
|
||||
ENDIF ()
|
|
@ -1,51 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TSDB_BUFFER_H_
|
||||
#define _TD_TSDB_BUFFER_H_
|
||||
|
||||
typedef struct {
|
||||
int64_t blockId;
|
||||
int offset;
|
||||
int remain;
|
||||
char data[];
|
||||
} STsdbBufBlock;
|
||||
|
||||
typedef struct {
|
||||
pthread_cond_t poolNotEmpty;
|
||||
int bufBlockSize;
|
||||
int tBufBlocks;
|
||||
int nBufBlocks;
|
||||
int nRecycleBlocks;
|
||||
int nElasticBlocks;
|
||||
int64_t index;
|
||||
SList* bufBlockList;
|
||||
} STsdbBufPool;
|
||||
|
||||
#define TSDB_BUFFER_RESERVE 1024 // Reseve 1K as commit threshold
|
||||
|
||||
STsdbBufPool* tsdbNewBufPool();
|
||||
void tsdbFreeBufPool(STsdbBufPool* pBufPool);
|
||||
int tsdbOpenBufPool(STsdbRepo* pRepo);
|
||||
void tsdbCloseBufPool(STsdbRepo* pRepo);
|
||||
SListNode* tsdbAllocBufBlockFromPool(STsdbRepo* pRepo);
|
||||
int tsdbExpandPool(STsdbRepo* pRepo, int32_t oldTotalBlocks);
|
||||
void tsdbRecycleBufferBlock(STsdbBufPool* pPool, SListNode *pNode, bool bELastic);
|
||||
|
||||
// health cite
|
||||
STsdbBufBlock *tsdbNewBufBlock(int bufBlockSize);
|
||||
void tsdbFreeBufBlock(STsdbBufBlock *pBufBlock);
|
||||
|
||||
#endif /* _TD_TSDB_BUFFER_H_ */
|
|
@ -1,28 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TSDB_LOG_H_
|
||||
#define _TD_TSDB_LOG_H_
|
||||
|
||||
extern int32_t tsdbDebugFlag;
|
||||
|
||||
#define tsdbFatal(...) do { if (tsdbDebugFlag & DEBUG_FATAL) { taosPrintLog("TDB FATAL ", 255, __VA_ARGS__); }} while(0)
|
||||
#define tsdbError(...) do { if (tsdbDebugFlag & DEBUG_ERROR) { taosPrintLog("TDB ERROR ", 255, __VA_ARGS__); }} while(0)
|
||||
#define tsdbWarn(...) do { if (tsdbDebugFlag & DEBUG_WARN) { taosPrintLog("TDB WARN ", 255, __VA_ARGS__); }} while(0)
|
||||
#define tsdbInfo(...) do { if (tsdbDebugFlag & DEBUG_INFO) { taosPrintLog("TDB ", 255, __VA_ARGS__); }} while(0)
|
||||
#define tsdbDebug(...) do { if (tsdbDebugFlag & DEBUG_DEBUG) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TDB ", tsdbDebugFlag, __VA_ARGS__); }} while(0)
|
||||
|
||||
#endif /* _TD_TSDB_LOG_H_ */
|
|
@ -1,153 +0,0 @@
|
|||
/*
|
||||
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
|
||||
*
|
||||
* This program is free software: you can use, redistribute, and/or modify
|
||||
* it under the terms of the GNU Affero General Public License, version 3
|
||||
* or later ("AGPL"), as published by the Free Software Foundation.
|
||||
*
|
||||
* This program is distributed in the hope that it will be useful, but WITHOUT
|
||||
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
|
||||
* FITNESS FOR A PARTICULAR PURPOSE.
|
||||
*
|
||||
* You should have received a copy of the GNU Affero General Public License
|
||||
* along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
*/
|
||||
|
||||
#ifndef _TD_TSDB_META_H_
|
||||
#define _TD_TSDB_META_H_
|
||||
|
||||
#define TSDB_MAX_TABLE_SCHEMAS 16
|
||||
|
||||
typedef struct STable {
|
||||
STableId tableId;
|
||||
ETableType type;
|
||||
tstr* name; // NOTE: there a flexible string here
|
||||
uint64_t suid;
|
||||
struct STable* pSuper; // super table pointer
|
||||
SArray* schema;
|
||||
STSchema* tagSchema;
|
||||
SKVRow tagVal;
|
||||
SSkipList* pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
||||
void* eventHandler; // TODO
|
||||
void* streamHandler; // TODO
|
||||
TSKEY lastKey;
|
||||
SMemRow lastRow;
|
||||
char* sql;
|
||||
void* cqhandle;
|
||||
SRWLatch latch; // TODO: implementa latch functions
|
||||
|
||||
SDataCol *lastCols;
|
||||
int16_t maxColNum;
|
||||
int16_t restoreColumnNum;
|
||||
bool hasRestoreLastColumn;
|
||||
int lastColSVersion;
|
||||
T_REF_DECLARE()
|
||||
} STable;
|
||||
|
||||
typedef struct {
|
||||
pthread_rwlock_t rwLock;
|
||||
|
||||
int32_t nTables;
|
||||
int32_t maxTables;
|
||||
STable** tables;
|
||||
SList* superList;
|
||||
SHashObj* uidMap;
|
||||
int maxRowBytes;
|
||||
int maxCols;
|
||||
} STsdbMeta;
|
||||
|
||||
#define TSDB_INIT_NTABLES 1024
|
||||
#define TABLE_TYPE(t) (t)->type
|
||||
#define TABLE_NAME(t) (t)->name
|
||||
#define TABLE_CHAR_NAME(t) TABLE_NAME(t)->data
|
||||
#define TABLE_UID(t) (t)->tableId.uid
|
||||
#define TABLE_TID(t) (t)->tableId.tid
|
||||
#define TABLE_SUID(t) (t)->suid
|
||||
// #define TSDB_META_FILE_MAGIC(m) KVSTORE_MAGIC((m)->pStore)
|
||||
#define TSDB_RLOCK_TABLE(t) taosRLockLatch(&((t)->latch))
|
||||
#define TSDB_RUNLOCK_TABLE(t) taosRUnLockLatch(&((t)->latch))
|
||||
#define TSDB_WLOCK_TABLE(t) taosWLockLatch(&((t)->latch))
|
||||
#define TSDB_WUNLOCK_TABLE(t) taosWUnLockLatch(&((t)->latch))
|
||||
|
||||
STsdbMeta* tsdbNewMeta(STsdbCfg* pCfg);
|
||||
void tsdbFreeMeta(STsdbMeta* pMeta);
|
||||
int tsdbOpenMeta(STsdbRepo* pRepo);
|
||||
int tsdbCloseMeta(STsdbRepo* pRepo);
|
||||
STable* tsdbGetTableByUid(STsdbMeta* pMeta, uint64_t uid);
|
||||
STSchema* tsdbGetTableSchemaByVersion(STable* pTable, int16_t _version);
|
||||
int tsdbWLockRepoMeta(STsdbRepo* pRepo);
|
||||
int tsdbRLockRepoMeta(STsdbRepo* pRepo);
|
||||
int tsdbUnlockRepoMeta(STsdbRepo* pRepo);
|
||||
void tsdbRefTable(STable* pTable);
|
||||
void tsdbUnRefTable(STable* pTable);
|
||||
void tsdbUpdateTableSchema(STsdbRepo* pRepo, STable* pTable, STSchema* pSchema, bool insertAct);
|
||||
int tsdbRestoreTable(STsdbRepo* pRepo, void* cont, int contLen);
|
||||
void tsdbOrgMeta(STsdbRepo* pRepo);
|
||||
int tsdbInitColIdCacheWithSchema(STable* pTable, STSchema* pSchema);
|
||||
int16_t tsdbGetLastColumnsIndexByColId(STable* pTable, int16_t colId);
|
||||
int tsdbUpdateLastColSchema(STable *pTable, STSchema *pNewSchema);
|
||||
STSchema* tsdbGetTableLatestSchema(STable *pTable);
|
||||
void tsdbFreeLastColumns(STable* pTable);
|
||||
|
||||
static FORCE_INLINE int tsdbCompareSchemaVersion(const void *key1, const void *key2) {
|
||||
if (*(int16_t *)key1 < schemaVersion(*(STSchema **)key2)) {
|
||||
return -1;
|
||||
} else if (*(int16_t *)key1 > schemaVersion(*(STSchema **)key2)) {
|
||||
return 1;
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE STSchema* tsdbGetTableSchemaImpl(STable* pTable, bool lock, bool copy, int16_t _version) {
|
||||
STable* pDTable = (pTable->pSuper != NULL) ? pTable->pSuper : pTable; // for performance purpose
|
||||
STSchema* pSchema = NULL;
|
||||
STSchema* pTSchema = NULL;
|
||||
|
||||
if (lock) TSDB_RLOCK_TABLE(pDTable);
|
||||
if (_version < 0) { // get the latest version of schema
|
||||
pTSchema = *(STSchema **)taosArrayGetLast(pDTable->schema);
|
||||
} else { // get the schema with version
|
||||
void* ptr = taosArraySearch(pDTable->schema, &_version, tsdbCompareSchemaVersion, TD_EQ);
|
||||
if (ptr == NULL) {
|
||||
terrno = TSDB_CODE_TDB_IVD_TB_SCHEMA_VERSION;
|
||||
goto _exit;
|
||||
}
|
||||
pTSchema = *(STSchema**)ptr;
|
||||
}
|
||||
|
||||
ASSERT(pTSchema != NULL);
|
||||
|
||||
if (copy) {
|
||||
if ((pSchema = tdDupSchema(pTSchema)) == NULL) terrno = TSDB_CODE_TDB_OUT_OF_MEMORY;
|
||||
} else {
|
||||
pSchema = pTSchema;
|
||||
}
|
||||
|
||||
_exit:
|
||||
if (lock) TSDB_RUNLOCK_TABLE(pDTable);
|
||||
return pSchema;
|
||||
}
|
||||
|
||||
static FORCE_INLINE STSchema* tsdbGetTableSchema(STable* pTable) {
|
||||
return tsdbGetTableSchemaImpl(pTable, false, false, -1);
|
||||
}
|
||||
|
||||
static FORCE_INLINE STSchema *tsdbGetTableTagSchema(STable *pTable) {
|
||||
if (pTable->type == TSDB_CHILD_TABLE) { // check child table first
|
||||
STable *pSuper = pTable->pSuper;
|
||||
if (pSuper == NULL) return NULL;
|
||||
return pSuper->tagSchema;
|
||||
} else if (pTable->type == TSDB_SUPER_TABLE) {
|
||||
return pTable->tagSchema;
|
||||
} else {
|
||||
return NULL;
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE TSKEY tsdbGetTableLastKeyImpl(STable* pTable) {
|
||||
ASSERT((pTable->lastRow == NULL) || (pTable->lastKey == memRowKey(pTable->lastRow)));
|
||||
return pTable->lastKey;
|
||||
}
|
||||
|
||||
#endif /* _TD_TSDB_META_H_ */
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -1,6 +0,0 @@
|
|||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
||||
|
||||
add_executable(tsdbTests ${SOURCE_LIST})
|
||||
target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb tutil trpc)
|
||||
|
||||
add_test(NAME unit COMMAND ${CMAKE_CURRENT_BINARY_DIR}/tsdbTests)
|
|
@ -1,163 +0,0 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbMain.h"
|
||||
|
||||
static double getCurTime() {
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
return tv.tv_sec + tv.tv_usec * 1E-6;
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
STsdbRepo *pRepo;
|
||||
bool isAscend;
|
||||
int tid;
|
||||
uint64_t uid;
|
||||
int sversion;
|
||||
TSKEY startTime;
|
||||
TSKEY interval;
|
||||
int totalRows;
|
||||
int rowsPerSubmit;
|
||||
STSchema * pSchema;
|
||||
} SInsertInfo;
|
||||
|
||||
static int insertData(SInsertInfo *pInfo) {
|
||||
SSubmitMsg *pMsg =
|
||||
(SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowMaxBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit);
|
||||
if (pMsg == NULL) return -1;
|
||||
TSKEY start_time = pInfo->startTime;
|
||||
|
||||
// Loop to write data
|
||||
double stime = getCurTime();
|
||||
|
||||
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
|
||||
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
|
||||
SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks;
|
||||
pBlock->uid = pInfo->uid;
|
||||
pBlock->tid = pInfo->tid;
|
||||
pBlock->sversion = pInfo->sversion;
|
||||
pBlock->dataLen = 0;
|
||||
pBlock->schemaLen = 0;
|
||||
pBlock->numOfRows = 0;
|
||||
for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
|
||||
// start_time += 1000;
|
||||
if (pInfo->isAscend) {
|
||||
start_time += pInfo->interval;
|
||||
} else {
|
||||
start_time -= pInfo->interval;
|
||||
}
|
||||
SDataRow row = (SDataRow)(pBlock->data + pBlock->dataLen);
|
||||
tdInitDataRow(row, pInfo->pSchema);
|
||||
|
||||
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
|
||||
STColumn *pTCol = schemaColAt(pInfo->pSchema, j);
|
||||
if (j == 0) { // Just for timestamp
|
||||
tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->offset);
|
||||
} else { // For int
|
||||
int val = 10;
|
||||
tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->offset);
|
||||
}
|
||||
}
|
||||
pBlock->dataLen += dataRowLen(row);
|
||||
pBlock->numOfRows++;
|
||||
}
|
||||
pMsg->length = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pBlock->dataLen;
|
||||
pMsg->numOfBlocks = 1;
|
||||
|
||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
pBlock->numOfRows = htonl(pBlock->numOfRows);
|
||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
pBlock->uid = htobe64(pBlock->uid);
|
||||
pBlock->tid = htonl(pBlock->tid);
|
||||
|
||||
pBlock->sversion = htonl(pBlock->sversion);
|
||||
pBlock->padding = htonl(pBlock->padding);
|
||||
|
||||
pMsg->length = htonl(pMsg->length);
|
||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
|
||||
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) {
|
||||
tfree(pMsg);
|
||||
return -1;
|
||||
}
|
||||
}
|
||||
|
||||
double etime = getCurTime();
|
||||
|
||||
printf("Spent %f seconds to write %d records\n", etime - stime, pInfo->totalRows);
|
||||
tfree(pMsg);
|
||||
return 0;
|
||||
}
|
||||
|
||||
static void tsdbSetCfg(STsdbCfg *pCfg, int32_t tsdbId, int32_t cacheBlockSize, int32_t totalBlocks, int32_t maxTables,
|
||||
int32_t daysPerFile, int32_t keep, int32_t minRows, int32_t maxRows, int8_t precision,
|
||||
int8_t compression) {
|
||||
pCfg->tsdbId = tsdbId;
|
||||
pCfg->cacheBlockSize = cacheBlockSize;
|
||||
pCfg->totalBlocks = totalBlocks;
|
||||
// pCfg->maxTables = maxTables;
|
||||
pCfg->daysPerFile = daysPerFile;
|
||||
pCfg->keep = keep;
|
||||
pCfg->minRowsPerFileBlock = minRows;
|
||||
pCfg->maxRowsPerFileBlock = maxRows;
|
||||
pCfg->precision = precision;
|
||||
pCfg->compression = compression;
|
||||
}
|
||||
|
||||
static void tsdbSetTableCfg(STableCfg *pCfg) {
|
||||
STSchemaBuilder schemaBuilder = {0};
|
||||
|
||||
pCfg->type = TSDB_NORMAL_TABLE;
|
||||
pCfg->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
||||
pCfg->tableId.tid = 1;
|
||||
pCfg->tableId.uid = 5849583783847394;
|
||||
tdInitTSchemaBuilder(&schemaBuilder, 0);
|
||||
|
||||
int colId = 0;
|
||||
for (int i = 0; i < 5; i++) {
|
||||
tdAddColToSchema(&schemaBuilder, (colId == 0) ? TSDB_DATA_TYPE_TIMESTAMP : TSDB_DATA_TYPE_INT, colId, 0);
|
||||
colId++;
|
||||
}
|
||||
|
||||
pCfg->schema = tdGetSchemaFromBuilder(&schemaBuilder);
|
||||
pCfg->name = strdup("t1");
|
||||
|
||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
||||
}
|
||||
|
||||
TEST(TsdbTest, testInsertSpeed) {
|
||||
int vnode = 1;
|
||||
int ret = 0;
|
||||
STsdbCfg tsdbCfg;
|
||||
STableCfg tableCfg;
|
||||
std::string testDir = "./test";
|
||||
char * rootDir = strdup((testDir + "/vnode" + std::to_string(vnode)).c_str());
|
||||
|
||||
tsdbDebugFlag = 131; //NOTE: you must set the flag
|
||||
|
||||
taosRemoveDir(rootDir);
|
||||
|
||||
// Create and open repository
|
||||
tsdbSetCfg(&tsdbCfg, 1, 16, 4, -1, -1, -1, -1, -1, -1, -1);
|
||||
tsdbCreateRepo(rootDir, &tsdbCfg);
|
||||
STsdbRepo *repo = tsdbOpenRepo(rootDir, NULL);
|
||||
ASSERT_NE(repo, nullptr);
|
||||
|
||||
// Create table
|
||||
tsdbSetTableCfg(&tableCfg);
|
||||
tsdbCreateTable(repo, &tableCfg);
|
||||
|
||||
// Insert data
|
||||
SInsertInfo iInfo = {repo, true, 1, 5849583783847394, 0, 1590000000000, 10, 10000000, 100, tableCfg.schema};
|
||||
|
||||
insertData(&iInfo);
|
||||
|
||||
tsdbCloseRepo(repo, 1);
|
||||
}
|
||||
|
||||
static char *getTKey(const void *data) {
|
||||
return (char *)data;
|
||||
}
|
Loading…
Reference in New Issue