merge TSDB file
This commit is contained in:
parent
b6e7922099
commit
816a9d9303
|
@ -16,6 +16,7 @@
|
||||||
#ifndef _TD_TSDB_COMMIT_H_
|
#ifndef _TD_TSDB_COMMIT_H_
|
||||||
#define _TD_TSDB_COMMIT_H_
|
#define _TD_TSDB_COMMIT_H_
|
||||||
|
|
||||||
|
#if 0
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int minFid;
|
int minFid;
|
||||||
int midFid;
|
int midFid;
|
||||||
|
@ -53,5 +54,6 @@ static FORCE_INLINE int tsdbGetFidLevel(int fid, SRtn *pRtn) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif /* _TD_TSDB_COMMIT_H_ */
|
#endif /* _TD_TSDB_COMMIT_H_ */
|
|
@ -19,8 +19,12 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
void *tsdbCompactImpl(STsdbRepo *pRepo);
|
void *tsdbCompactImpl(STsdbRepo *pRepo);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TD_TSDB_FS_H_
|
#ifndef _TD_TSDB_FS_H_
|
||||||
#define _TD_TSDB_FS_H_
|
#define _TD_TSDB_FS_H_
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
#define TSDB_FS_VERSION 0
|
#define TSDB_FS_VERSION 0
|
||||||
|
|
||||||
// ================== TSDB global config
|
// ================== TSDB global config
|
||||||
|
@ -113,4 +115,6 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS* pFs) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif /* _TD_TSDB_FS_H_ */
|
#endif /* _TD_TSDB_FS_H_ */
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef _TS_TSDB_FILE_H_
|
#ifndef _TS_TSDB_FILE_H_
|
||||||
#define _TS_TSDB_FILE_H_
|
#define _TS_TSDB_FILE_H_
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
#define TSDB_FILE_HEAD_SIZE 512
|
#define TSDB_FILE_HEAD_SIZE 512
|
||||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||||
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
#define TSDB_FILE_INIT_MAGIC 0xFFFFFFFF
|
||||||
|
@ -364,4 +366,5 @@ static FORCE_INLINE bool tsdbFSetIsOk(SDFileSet* pSet) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
#endif /* _TS_TSDB_FILE_H_ */
|
#endif /* _TS_TSDB_FILE_H_ */
|
|
@ -16,10 +16,14 @@
|
||||||
#ifndef _TD_TSDB_HEALTH_H_
|
#ifndef _TD_TSDB_HEALTH_H_
|
||||||
#define _TD_TSDB_HEALTH_H_
|
#define _TD_TSDB_HEALTH_H_
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
|
bool tsdbUrgeQueryFree(STsdbRepo* pRepo);
|
||||||
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
|
int32_t tsdbInsertNewBlock(STsdbRepo* pRepo);
|
||||||
|
|
||||||
bool tsdbIdleMemEnough();
|
bool tsdbIdleMemEnough();
|
||||||
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
|
bool tsdbAllowNewBlock(STsdbRepo* pRepo);
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif /* _TD_TSDB_BUFFER_H_ */
|
#endif /* _TD_TSDB_BUFFER_H_ */
|
|
@ -15,6 +15,7 @@
|
||||||
|
|
||||||
#ifndef _TD_TSDB_READ_IMPL_H_
|
#ifndef _TD_TSDB_READ_IMPL_H_
|
||||||
#define _TD_TSDB_READ_IMPL_H_
|
#define _TD_TSDB_READ_IMPL_H_
|
||||||
|
#if 0
|
||||||
|
|
||||||
#include "tfs.h"
|
#include "tfs.h"
|
||||||
#include "tsdb.h"
|
#include "tsdb.h"
|
||||||
|
@ -150,4 +151,6 @@ static FORCE_INLINE int tsdbMakeRoom(void **ppBuf, size_t size) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif /*_TD_TSDB_READ_IMPL_H_*/
|
#endif /*_TD_TSDB_READ_IMPL_H_*/
|
|
@ -16,6 +16,8 @@
|
||||||
#ifndef TSDB_ROW_MERGE_BUF_H
|
#ifndef TSDB_ROW_MERGE_BUF_H
|
||||||
#define TSDB_ROW_MERGE_BUF_H
|
#define TSDB_ROW_MERGE_BUF_H
|
||||||
|
|
||||||
|
#if 0
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
@ -42,4 +44,6 @@ static FORCE_INLINE void tsdbFreeMergeBuf(SMergeBuf buf) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#endif
|
||||||
|
|
||||||
#endif /* ifndef TSDB_ROW_MERGE_BUF_H */
|
#endif /* ifndef TSDB_ROW_MERGE_BUF_H */
|
|
@ -16,6 +16,7 @@
|
||||||
#ifndef _TD_TSDB_INT_H_
|
#ifndef _TD_TSDB_INT_H_
|
||||||
#define _TD_TSDB_INT_H_
|
#define _TD_TSDB_INT_H_
|
||||||
|
|
||||||
|
#if 0
|
||||||
// // TODO: remove the include
|
// // TODO: remove the include
|
||||||
// #include <errno.h>
|
// #include <errno.h>
|
||||||
// #include <fcntl.h>
|
// #include <fcntl.h>
|
||||||
|
@ -144,4 +145,5 @@ static FORCE_INLINE int tsdbGetNextMaxTables(int tid) {
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#endif
|
||||||
#endif /* _TD_TSDB_INT_H_ */
|
#endif /* _TD_TSDB_INT_H_ */
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -1,16 +0,0 @@
|
||||||
CMAKE_MINIMUM_REQUIRED(VERSION 2.8...3.20)
|
|
||||||
PROJECT(TDengine)
|
|
||||||
|
|
||||||
INCLUDE_DIRECTORIES(inc)
|
|
||||||
AUX_SOURCE_DIRECTORY(src SRC)
|
|
||||||
ADD_LIBRARY(tsdb ${SRC})
|
|
||||||
TARGET_LINK_LIBRARIES(tsdb tfs common tutil)
|
|
||||||
|
|
||||||
IF (TD_TSDB_PLUGINS)
|
|
||||||
TARGET_LINK_LIBRARIES(tsdb tsdbPlugins)
|
|
||||||
ENDIF ()
|
|
||||||
|
|
||||||
IF (TD_LINUX)
|
|
||||||
# Someone has no gtest directory, so comment it
|
|
||||||
# ADD_SUBDIRECTORY(tests)
|
|
||||||
ENDIF ()
|
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -1,6 +0,0 @@
|
||||||
AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
|
|
||||||
|
|
||||||
add_executable(tsdbTests ${SOURCE_LIST})
|
|
||||||
target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb tutil trpc)
|
|
||||||
|
|
||||||
add_test(NAME unit COMMAND ${CMAKE_CURRENT_BINARY_DIR}/tsdbTests)
|
|
|
@ -1,163 +0,0 @@
|
||||||
#include <gtest/gtest.h>
|
|
||||||
#include <stdlib.h>
|
|
||||||
#include <sys/time.h>
|
|
||||||
|
|
||||||
#include "tsdb.h"
|
|
||||||
#include "tsdbMain.h"
|
|
||||||
|
|
||||||
static double getCurTime() {
|
|
||||||
struct timeval tv;
|
|
||||||
gettimeofday(&tv, NULL);
|
|
||||||
return tv.tv_sec + tv.tv_usec * 1E-6;
|
|
||||||
}
|
|
||||||
|
|
||||||
typedef struct {
|
|
||||||
STsdbRepo *pRepo;
|
|
||||||
bool isAscend;
|
|
||||||
int tid;
|
|
||||||
uint64_t uid;
|
|
||||||
int sversion;
|
|
||||||
TSKEY startTime;
|
|
||||||
TSKEY interval;
|
|
||||||
int totalRows;
|
|
||||||
int rowsPerSubmit;
|
|
||||||
STSchema * pSchema;
|
|
||||||
} SInsertInfo;
|
|
||||||
|
|
||||||
static int insertData(SInsertInfo *pInfo) {
|
|
||||||
SSubmitMsg *pMsg =
|
|
||||||
(SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + dataRowMaxBytesFromSchema(pInfo->pSchema) * pInfo->rowsPerSubmit);
|
|
||||||
if (pMsg == NULL) return -1;
|
|
||||||
TSKEY start_time = pInfo->startTime;
|
|
||||||
|
|
||||||
// Loop to write data
|
|
||||||
double stime = getCurTime();
|
|
||||||
|
|
||||||
for (int k = 0; k < pInfo->totalRows/pInfo->rowsPerSubmit; k++) {
|
|
||||||
memset((void *)pMsg, 0, sizeof(SSubmitMsg));
|
|
||||||
SSubmitBlk *pBlock = (SSubmitBlk *)pMsg->blocks;
|
|
||||||
pBlock->uid = pInfo->uid;
|
|
||||||
pBlock->tid = pInfo->tid;
|
|
||||||
pBlock->sversion = pInfo->sversion;
|
|
||||||
pBlock->dataLen = 0;
|
|
||||||
pBlock->schemaLen = 0;
|
|
||||||
pBlock->numOfRows = 0;
|
|
||||||
for (int i = 0; i < pInfo->rowsPerSubmit; i++) {
|
|
||||||
// start_time += 1000;
|
|
||||||
if (pInfo->isAscend) {
|
|
||||||
start_time += pInfo->interval;
|
|
||||||
} else {
|
|
||||||
start_time -= pInfo->interval;
|
|
||||||
}
|
|
||||||
SDataRow row = (SDataRow)(pBlock->data + pBlock->dataLen);
|
|
||||||
tdInitDataRow(row, pInfo->pSchema);
|
|
||||||
|
|
||||||
for (int j = 0; j < schemaNCols(pInfo->pSchema); j++) {
|
|
||||||
STColumn *pTCol = schemaColAt(pInfo->pSchema, j);
|
|
||||||
if (j == 0) { // Just for timestamp
|
|
||||||
tdAppendColVal(row, (void *)(&start_time), pTCol->type, pTCol->offset);
|
|
||||||
} else { // For int
|
|
||||||
int val = 10;
|
|
||||||
tdAppendColVal(row, (void *)(&val), pTCol->type, pTCol->offset);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
pBlock->dataLen += dataRowLen(row);
|
|
||||||
pBlock->numOfRows++;
|
|
||||||
}
|
|
||||||
pMsg->length = sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + pBlock->dataLen;
|
|
||||||
pMsg->numOfBlocks = 1;
|
|
||||||
|
|
||||||
pBlock->dataLen = htonl(pBlock->dataLen);
|
|
||||||
pBlock->numOfRows = htonl(pBlock->numOfRows);
|
|
||||||
pBlock->schemaLen = htonl(pBlock->schemaLen);
|
|
||||||
pBlock->uid = htobe64(pBlock->uid);
|
|
||||||
pBlock->tid = htonl(pBlock->tid);
|
|
||||||
|
|
||||||
pBlock->sversion = htonl(pBlock->sversion);
|
|
||||||
pBlock->padding = htonl(pBlock->padding);
|
|
||||||
|
|
||||||
pMsg->length = htonl(pMsg->length);
|
|
||||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
|
||||||
|
|
||||||
if (tsdbInsertData(pInfo->pRepo, pMsg, NULL) < 0) {
|
|
||||||
tfree(pMsg);
|
|
||||||
return -1;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
double etime = getCurTime();
|
|
||||||
|
|
||||||
printf("Spent %f seconds to write %d records\n", etime - stime, pInfo->totalRows);
|
|
||||||
tfree(pMsg);
|
|
||||||
return 0;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbSetCfg(STsdbCfg *pCfg, int32_t tsdbId, int32_t cacheBlockSize, int32_t totalBlocks, int32_t maxTables,
|
|
||||||
int32_t daysPerFile, int32_t keep, int32_t minRows, int32_t maxRows, int8_t precision,
|
|
||||||
int8_t compression) {
|
|
||||||
pCfg->tsdbId = tsdbId;
|
|
||||||
pCfg->cacheBlockSize = cacheBlockSize;
|
|
||||||
pCfg->totalBlocks = totalBlocks;
|
|
||||||
// pCfg->maxTables = maxTables;
|
|
||||||
pCfg->daysPerFile = daysPerFile;
|
|
||||||
pCfg->keep = keep;
|
|
||||||
pCfg->minRowsPerFileBlock = minRows;
|
|
||||||
pCfg->maxRowsPerFileBlock = maxRows;
|
|
||||||
pCfg->precision = precision;
|
|
||||||
pCfg->compression = compression;
|
|
||||||
}
|
|
||||||
|
|
||||||
static void tsdbSetTableCfg(STableCfg *pCfg) {
|
|
||||||
STSchemaBuilder schemaBuilder = {0};
|
|
||||||
|
|
||||||
pCfg->type = TSDB_NORMAL_TABLE;
|
|
||||||
pCfg->superUid = TSDB_INVALID_SUPER_TABLE_ID;
|
|
||||||
pCfg->tableId.tid = 1;
|
|
||||||
pCfg->tableId.uid = 5849583783847394;
|
|
||||||
tdInitTSchemaBuilder(&schemaBuilder, 0);
|
|
||||||
|
|
||||||
int colId = 0;
|
|
||||||
for (int i = 0; i < 5; i++) {
|
|
||||||
tdAddColToSchema(&schemaBuilder, (colId == 0) ? TSDB_DATA_TYPE_TIMESTAMP : TSDB_DATA_TYPE_INT, colId, 0);
|
|
||||||
colId++;
|
|
||||||
}
|
|
||||||
|
|
||||||
pCfg->schema = tdGetSchemaFromBuilder(&schemaBuilder);
|
|
||||||
pCfg->name = strdup("t1");
|
|
||||||
|
|
||||||
tdDestroyTSchemaBuilder(&schemaBuilder);
|
|
||||||
}
|
|
||||||
|
|
||||||
TEST(TsdbTest, testInsertSpeed) {
|
|
||||||
int vnode = 1;
|
|
||||||
int ret = 0;
|
|
||||||
STsdbCfg tsdbCfg;
|
|
||||||
STableCfg tableCfg;
|
|
||||||
std::string testDir = "./test";
|
|
||||||
char * rootDir = strdup((testDir + "/vnode" + std::to_string(vnode)).c_str());
|
|
||||||
|
|
||||||
tsdbDebugFlag = 131; //NOTE: you must set the flag
|
|
||||||
|
|
||||||
taosRemoveDir(rootDir);
|
|
||||||
|
|
||||||
// Create and open repository
|
|
||||||
tsdbSetCfg(&tsdbCfg, 1, 16, 4, -1, -1, -1, -1, -1, -1, -1);
|
|
||||||
tsdbCreateRepo(rootDir, &tsdbCfg);
|
|
||||||
STsdbRepo *repo = tsdbOpenRepo(rootDir, NULL);
|
|
||||||
ASSERT_NE(repo, nullptr);
|
|
||||||
|
|
||||||
// Create table
|
|
||||||
tsdbSetTableCfg(&tableCfg);
|
|
||||||
tsdbCreateTable(repo, &tableCfg);
|
|
||||||
|
|
||||||
// Insert data
|
|
||||||
SInsertInfo iInfo = {repo, true, 1, 5849583783847394, 0, 1590000000000, 10, 10000000, 100, tableCfg.schema};
|
|
||||||
|
|
||||||
insertData(&iInfo);
|
|
||||||
|
|
||||||
tsdbCloseRepo(repo, 1);
|
|
||||||
}
|
|
||||||
|
|
||||||
static char *getTKey(const void *data) {
|
|
||||||
return (char *)data;
|
|
||||||
}
|
|
Loading…
Reference in New Issue