Merge branch '2.0' into liaohj_2
This commit is contained in:
commit
a6b0e4a96b
|
@ -150,7 +150,7 @@ void tdFreeSchema(STSchema *pSchema) {
|
|||
*/
|
||||
void tdUpdateSchema(STSchema *pSchema) {
|
||||
STColumn *pCol = NULL;
|
||||
int32_t offset = 0;
|
||||
int32_t offset = TD_DATA_ROW_HEAD_SIZE;
|
||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
||||
pCol = schemaColAt(pSchema, i);
|
||||
colSetOffset(pCol, offset);
|
||||
|
|
|
@ -59,6 +59,8 @@ tsdb_repo_t * tsdbOpenRepo(char *tsdbDir);
|
|||
int32_t tsdbCloseRepo(tsdb_repo_t *repo);
|
||||
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
|
||||
int32_t tsdbTriggerCommit(tsdb_repo_t *repo);
|
||||
int32_t tsdbLockRepo(tsdb_repo_t *repo);
|
||||
int32_t tsdbUnLockRepo(tsdb_repo_t *repo);
|
||||
|
||||
// --------- TSDB TABLE DEFINITION
|
||||
typedef struct {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
|
||||
#include "taosdef.h"
|
||||
#include "tlist.h"
|
||||
#include "tsdb.h"
|
||||
|
||||
#ifdef __cplusplus
|
||||
extern "C" {
|
||||
|
@ -49,13 +50,15 @@ typedef struct {
|
|||
typedef struct {
|
||||
int maxBytes;
|
||||
int cacheBlockSize;
|
||||
int totalCacheBlocks;
|
||||
STsdbCachePool pool;
|
||||
STsdbCacheBlock *curBlock;
|
||||
SCacheMem * mem;
|
||||
SCacheMem * imem;
|
||||
tsdb_repo_t * pRepo;
|
||||
} STsdbCache;
|
||||
|
||||
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize);
|
||||
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo);
|
||||
void tsdbFreeCache(STsdbCache *pCache);
|
||||
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
|
||||
|
||||
|
|
|
@ -14,12 +14,14 @@
|
|||
*/
|
||||
#include <stdlib.h>
|
||||
|
||||
#include "tsdb.h"
|
||||
#include "tsdbCache.h"
|
||||
|
||||
static int tsdbAllocBlockFromPool(STsdbCache *pCache);
|
||||
static void tsdbFreeBlockList(SCacheMem *mem);
|
||||
static int tsdbAllocBlockFromPool(STsdbCache *pCache);
|
||||
static void tsdbFreeBlockList(SList *list);
|
||||
static void tsdbFreeCacheMem(SCacheMem *mem);
|
||||
|
||||
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
|
||||
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize, tsdb_repo_t *pRepo) {
|
||||
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
|
||||
if (pCache == NULL) return NULL;
|
||||
|
||||
|
@ -27,9 +29,11 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
|
|||
|
||||
pCache->maxBytes = maxBytes;
|
||||
pCache->cacheBlockSize = cacheBlockSize;
|
||||
pCache->pRepo = pRepo;
|
||||
|
||||
int nBlocks = maxBytes / cacheBlockSize + 1;
|
||||
if (nBlocks <= 1) nBlocks = 2;
|
||||
pCache->totalCacheBlocks = nBlocks;
|
||||
|
||||
STsdbCachePool *pPool = &(pCache->pool);
|
||||
pPool->index = 0;
|
||||
|
@ -57,8 +61,8 @@ _err:
|
|||
}
|
||||
|
||||
void tsdbFreeCache(STsdbCache *pCache) {
|
||||
tsdbFreeBlockList(pCache->imem);
|
||||
tsdbFreeBlockList(pCache->mem);
|
||||
tsdbFreeCacheMem(pCache->imem);
|
||||
tsdbFreeCacheMem(pCache->mem);
|
||||
tsdbFreeBlockList(pCache->pool.memPool);
|
||||
free(pCache);
|
||||
}
|
||||
|
@ -67,22 +71,10 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
|
|||
if (pCache == NULL) return NULL;
|
||||
if (bytes > pCache->cacheBlockSize) return NULL;
|
||||
|
||||
if (pCache->mem == NULL) { // Create a new one
|
||||
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
|
||||
if (pCache->mem == NULL) return NULL;
|
||||
pCache->mem->keyFirst = INT64_MAX;
|
||||
pCache->mem->keyLast = 0;
|
||||
pCache->mem->numOfPoints = 0;
|
||||
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
|
||||
}
|
||||
|
||||
if (isListEmpty(pCache->mem->list)) {
|
||||
if (tsdbAllocBlockFromPool(pCache) < 0) {
|
||||
// TODO: deal with the error
|
||||
if (pCache->curBlock == NULL || pCache->curBlock->remain < bytes) {
|
||||
if (pCache->curBlock !=NULL && (pCache->mem->list) >= pCache->totalCacheBlocks/2) {
|
||||
tsdbTriggerCommit(pCache->pRepo);
|
||||
}
|
||||
}
|
||||
|
||||
if (pCache->curBlock->remain < bytes) {
|
||||
if (tsdbAllocBlockFromPool(pCache) < 0) {
|
||||
// TODO: deal with the error
|
||||
}
|
||||
|
@ -99,9 +91,7 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
|
|||
return ptr;
|
||||
}
|
||||
|
||||
static void tsdbFreeBlockList(SCacheMem *mem) {
|
||||
if (mem == NULL) return;
|
||||
SList * list = mem->list;
|
||||
static void tsdbFreeBlockList(SList *list) {
|
||||
SListNode * node = NULL;
|
||||
STsdbCacheBlock *pBlock = NULL;
|
||||
while ((node = tdListPopHead(list)) != NULL) {
|
||||
|
@ -110,12 +100,23 @@ static void tsdbFreeBlockList(SCacheMem *mem) {
|
|||
listNodeFree(node);
|
||||
}
|
||||
tdListFree(list);
|
||||
}
|
||||
|
||||
static void tsdbFreeCacheMem(SCacheMem *mem) {
|
||||
if (mem == NULL) return;
|
||||
SList *list = mem->list;
|
||||
tsdbFreeBlockList(list);
|
||||
free(mem);
|
||||
}
|
||||
|
||||
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
||||
STsdbCachePool *pPool = &(pCache->pool);
|
||||
if (listNEles(pPool->memPool) == 0) return -1;
|
||||
|
||||
tsdbLockRepo(pCache->pRepo);
|
||||
if (listNEles(pPool->memPool) == 0) {
|
||||
tsdbUnLockRepo(pCache->pRepo);
|
||||
return -1;
|
||||
}
|
||||
|
||||
SListNode *node = tdListPopHead(pPool->memPool);
|
||||
|
||||
|
@ -125,8 +126,19 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
|||
pBlock->offset = 0;
|
||||
pBlock->remain = pCache->cacheBlockSize;
|
||||
|
||||
if (pCache->mem == NULL) { // Create a new one
|
||||
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
|
||||
if (pCache->mem == NULL) return NULL;
|
||||
pCache->mem->keyFirst = INT64_MAX;
|
||||
pCache->mem->keyLast = 0;
|
||||
pCache->mem->numOfPoints = 0;
|
||||
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
|
||||
}
|
||||
|
||||
tdListAppendNode(pCache->mem->list, node);
|
||||
pCache->curBlock = pBlock;
|
||||
|
||||
tsdbUnLockRepo(pCache->pRepo);
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -150,6 +150,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
|
|||
pRepo->rootDir = strdup(rootDir);
|
||||
pRepo->config = *pCfg;
|
||||
pRepo->limiter = limiter;
|
||||
pthread_mutex_init(&pRepo->mutex, NULL);
|
||||
|
||||
// Create the environment files and directories
|
||||
if (tsdbSetRepoEnv(pRepo) < 0) {
|
||||
|
@ -168,7 +169,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
|
|||
pRepo->tsdbMeta = pMeta;
|
||||
|
||||
// Initialize cache
|
||||
STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1);
|
||||
STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1, (tsdb_repo_t *)pRepo);
|
||||
if (pCache == NULL) {
|
||||
free(pRepo->rootDir);
|
||||
tsdbFreeMeta(pRepo->tsdbMeta);
|
||||
|
@ -249,7 +250,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
|
|||
return NULL;
|
||||
}
|
||||
|
||||
pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1);
|
||||
pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1, (tsdb_repo_t *)pRepo);
|
||||
if (pRepo->tsdbCache == NULL) {
|
||||
tsdbFreeMeta(pRepo->tsdbMeta);
|
||||
free(pRepo->rootDir);
|
||||
|
@ -305,9 +306,12 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
|
|||
|
||||
int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
|
||||
if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1;
|
||||
if (pRepo->commit) return 0;
|
||||
|
||||
tsdbLockRepo(repo);
|
||||
if (pRepo->commit) {
|
||||
tsdbUnLockRepo(repo);
|
||||
return -1;
|
||||
}
|
||||
pRepo->commit = 1;
|
||||
// Loop to move pData to iData
|
||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
||||
|
@ -320,15 +324,25 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
|
|||
// TODO: Loop to move mem to imem
|
||||
pRepo->tsdbCache->imem = pRepo->tsdbCache->mem;
|
||||
pRepo->tsdbCache->mem = NULL;
|
||||
pRepo->tsdbCache->curBlock = NULL;
|
||||
|
||||
// TODO: here should set as detached or use join for memory leak
|
||||
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
|
||||
pthread_mutex_unlock(&(pRepo->mutex));
|
||||
|
||||
pthread_join(pRepo->commitThread, NULL);
|
||||
tsdbUnLockRepo(repo);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
int32_t tsdbLockRepo(tsdb_repo_t *repo) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
return pthread_mutex_lock(repo);
|
||||
}
|
||||
|
||||
int32_t tsdbUnLockRepo(tsdb_repo_t *repo) {
|
||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||
return pthread_mutex_unlock(repo);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the TSDB repository information, including some statistics
|
||||
* @param pRepo the TSDB repository handle
|
||||
|
@ -698,7 +712,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize);
|
||||
|
||||
TSKEY key = dataRowKey(row);
|
||||
printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints);
|
||||
// printf("insert:%lld, size:%d\n", key, pTable->mem->numOfPoints);
|
||||
|
||||
// Copy row into the memory
|
||||
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
|
||||
|
@ -710,6 +724,13 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
dataRowCpy(SL_GET_NODE_DATA(pNode), row);
|
||||
|
||||
// Insert the skiplist node into the data
|
||||
if (pTable->mem == NULL) {
|
||||
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
|
||||
if (pTable->mem == NULL) return -1;
|
||||
pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
||||
pTable->mem->keyFirst = INT64_MAX;
|
||||
pTable->mem->keyLast = 0;
|
||||
}
|
||||
tSkipListPut(pTable->mem->pData, pNode);
|
||||
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
|
||||
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
|
||||
|
@ -740,7 +761,7 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
|
|||
return 0;
|
||||
}
|
||||
|
||||
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, void *dst) {
|
||||
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead, SDataCol **cols, STSchema *pSchema) {
|
||||
int numOfRows = 0;
|
||||
do {
|
||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
||||
|
@ -749,6 +770,11 @@ static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY maxKey, int max
|
|||
SDataRow row = SL_GET_NODE_DATA(node);
|
||||
if (dataRowKey(row) > maxKey) break;
|
||||
// Convert row data to column data
|
||||
// for (int i = 0; i < schemaNCols(pSchema); i++) {
|
||||
// STColumn *pCol = schemaColAt(pSchema, i);
|
||||
// memcpy(cols[i]->data + TYPE_BYTES[colType(pCol)] * numOfRows, dataRowAt(row, pCol->offset),
|
||||
// TYPE_BYTES[colType(pCol)]);
|
||||
// }
|
||||
|
||||
numOfRows++;
|
||||
if (numOfRows > maxRowsToRead) break;
|
||||
|
@ -776,6 +802,8 @@ static void *tsdbCommitToFile(void *arg) {
|
|||
|
||||
int maxCols = pMeta->maxCols;
|
||||
int maxBytes = pMeta->maxRowBytes;
|
||||
SDataCol **cols = (SDataCol **)malloc(sizeof(SDataCol *) * maxCols);
|
||||
void *buf = malloc((maxBytes + sizeof(SDataCol)) * pCfg->maxRowsPerFileBlock);
|
||||
|
||||
for (int fid = sfid; fid <= efid; fid++) {
|
||||
TSKEY minKey = 0, maxKey = 0;
|
||||
|
@ -793,9 +821,17 @@ static void *tsdbCommitToFile(void *arg) {
|
|||
}
|
||||
}
|
||||
|
||||
// Init row data part
|
||||
cols[0] = (SDataCol *)buf;
|
||||
for (int col = 1; col < schemaNCols(pTable->schema); col++) {
|
||||
cols[col] = (SDataCol *)((char *)(cols[col - 1]) + sizeof(SDataCol) + colBytes(schemaColAt(pTable->schema, col-1)) * pCfg->maxRowsPerFileBlock);
|
||||
}
|
||||
|
||||
// Loop the iterator
|
||||
int rowsRead = 0;
|
||||
while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, NULL)) > 0) {
|
||||
while ((rowsRead = tsdbReadRowsFromCache(iters[tid], maxKey, pCfg->maxRowsPerFileBlock, cols, pTable->schema)) >
|
||||
0) {
|
||||
// printf("rowsRead:%d-----------\n", rowsRead);
|
||||
int k = 0;
|
||||
}
|
||||
}
|
||||
|
@ -806,7 +842,23 @@ static void *tsdbCommitToFile(void *arg) {
|
|||
if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]);
|
||||
}
|
||||
|
||||
free(buf);
|
||||
free(cols);
|
||||
free(iters);
|
||||
|
||||
tsdbLockRepo(arg);
|
||||
tdListMove(pCache->imem->list, pCache->pool.memPool);
|
||||
free(pCache->imem);
|
||||
pCache->imem = NULL;
|
||||
pRepo->commit = 0;
|
||||
// TODO: free the skiplist
|
||||
for (int i = 0; i < pCfg->maxTables; i++) {
|
||||
STable *pTable = pMeta->tables[i];
|
||||
if (pTable && pTable->imem) { // Here has memory leak
|
||||
pTable->imem = NULL;
|
||||
}
|
||||
}
|
||||
tsdbUnLockRepo(arg);
|
||||
|
||||
return NULL;
|
||||
}
|
|
@ -236,6 +236,10 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
|||
table->type = TSDB_NORMAL_TABLE;
|
||||
table->superUid = -1;
|
||||
table->schema = tdDupSchema(pCfg->schema);
|
||||
if (schemaNCols(table->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(table->schema);
|
||||
tdUpdateSchema(table->schema);
|
||||
int bytes = tdMaxRowBytesFromSchema(table->schema);
|
||||
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
|
||||
}
|
||||
|
||||
// Register to meta
|
||||
|
|
|
@ -1,12 +1,19 @@
|
|||
#include <gtest/gtest.h>
|
||||
#include <stdlib.h>
|
||||
#include <sys/time.h>
|
||||
|
||||
#include "tsdb.h"
|
||||
#include "dataformat.h"
|
||||
#include "tsdbFile.h"
|
||||
#include "tsdbMeta.h"
|
||||
|
||||
TEST(TsdbTest, tableEncodeDecode) {
|
||||
double getCurTime() {
|
||||
struct timeval tv;
|
||||
gettimeofday(&tv, NULL);
|
||||
return tv.tv_sec + tv.tv_usec * 1E-6;
|
||||
}
|
||||
|
||||
TEST(TsdbTest, DISABLED_tableEncodeDecode) {
|
||||
STable *pTable = (STable *)malloc(sizeof(STable));
|
||||
|
||||
pTable->type = TSDB_NORMAL_TABLE;
|
||||
|
@ -71,19 +78,23 @@ TEST(TsdbTest, createRepo) {
|
|||
tsdbCreateTable(pRepo, &tCfg);
|
||||
|
||||
// // 3. Loop to write some simple data
|
||||
int nRows = 1000;
|
||||
int nRows = 10000000;
|
||||
int rowsPerSubmit = 10;
|
||||
int64_t start_time = 1584081000000;
|
||||
|
||||
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit);
|
||||
|
||||
double stime = getCurTime();
|
||||
|
||||
for (int k = 0; k < nRows/rowsPerSubmit; k++) {
|
||||
SSubmitBlk *pBlock = pMsg->blocks;
|
||||
pBlock->tableId = {.uid = 987607499877672L, .tid = 0};
|
||||
pBlock->uid = 987607499877672L;
|
||||
pBlock->tid = 0;
|
||||
pBlock->sversion = 0;
|
||||
pBlock->len = 0;
|
||||
for (int i = 0; i < rowsPerSubmit; i++) {
|
||||
start_time += 1000;
|
||||
// start_time += 1000;
|
||||
start_time -= 1000;
|
||||
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
|
||||
tdInitDataRow(row, schema);
|
||||
|
||||
|
@ -97,21 +108,38 @@ TEST(TsdbTest, createRepo) {
|
|||
}
|
||||
pBlock->len += dataRowLen(row);
|
||||
}
|
||||
pBlock->len = htonl(pBlock->len);
|
||||
pBlock->numOfRows = htonl(pBlock->numOfRows);
|
||||
pBlock->uid = htobe64(pBlock->uid);
|
||||
pBlock->tid = htonl(pBlock->tid);
|
||||
|
||||
pBlock->sversion = htonl(pBlock->sversion);
|
||||
pBlock->padding = htonl(pBlock->padding);
|
||||
|
||||
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
|
||||
pMsg->length = htonl(pMsg->length);
|
||||
pMsg->numOfBlocks = htonl(pMsg->numOfBlocks);
|
||||
pMsg->compressed = htonl(pMsg->numOfBlocks);
|
||||
|
||||
tsdbInsertData(pRepo, pMsg);
|
||||
}
|
||||
|
||||
tsdbTriggerCommit(pRepo);
|
||||
double etime = getCurTime();
|
||||
|
||||
printf("Spent %f seconds to write %d records\n", etime - stime, nRows);
|
||||
|
||||
|
||||
|
||||
// tsdbTriggerCommit(pRepo);
|
||||
|
||||
}
|
||||
|
||||
TEST(TsdbTest, openRepo) {
|
||||
TEST(TsdbTest, DISABLED_openRepo) {
|
||||
tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0");
|
||||
ASSERT_NE(pRepo, nullptr);
|
||||
}
|
||||
|
||||
TEST(TsdbTest, createFileGroup) {
|
||||
TEST(TsdbTest, DISABLED_createFileGroup) {
|
||||
SFileGroup fGroup;
|
||||
|
||||
ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);
|
||||
|
|
Loading…
Reference in New Issue