Merge pull request #1394 from taosdata/feature/2.0tsdb

Feature/2.0tsdb
This commit is contained in:
slguan 2020-03-21 21:59:17 +08:00 committed by GitHub
commit 388b4482cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 645 additions and 170 deletions

View File

@ -101,23 +101,13 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol);
void tdDataRowReset(SDataRow row, STSchema *pSchema); void tdDataRowReset(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row); SDataRow tdDataRowDup(SDataRow row);
/* Data column definition // ----------------- Data column structure
* +---------+---------+-----------------------+ typedef struct SDataCol {
* | int32_t | int32_t | | int64_t len;
* +---------+---------+-----------------------+ char data[];
* | len | npoints | data | } SDataCol;
* +---------+---------+-----------------------+
*/
typedef char *SDataCol;
/* Data columns definition void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter);
* +---------+---------+-----------------------+--------+-----------------------+
* | int32_t | int32_t | | | |
* +---------+---------+-----------------------+--------+-----------------------+
* | len | npoints | SDataCol | .... | SDataCol |
* +---------+---------+-----------------------+--------+-----------------------+
*/
typedef char *SDataCols;
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -294,6 +294,16 @@ SDataRow tdDataRowDup(SDataRow row) {
return trow; return trow;
} }
void tdConvertDataRowToCol(SDataCol *cols, STSchema *pSchema, int *iter) {
int row = *iter;
for (int i = 0; i < schemaNCols(pSchema); i++) {
// TODO
}
*iter = row + 1;
}
/** /**
* Return the first part length of a data row for a schema * Return the first part length of a data row for a schema
*/ */

69
src/util/inc/tlist.h Normal file
View File

@ -0,0 +1,69 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_LIST_
#define _TD_LIST_
#ifdef __cplusplus
extern "C" {
#endif
typedef enum { TD_LIST_FORWARD, TD_LIST_BACKWARD } TD_LIST_DIRECTION_T;
typedef struct _list_node {
struct _list_node *next;
struct _list_node *prev;
char data[];
} SListNode;
typedef struct {
struct _list_node *head;
struct _list_node *tail;
int numOfEles;
int eleSize;
} SList;
typedef struct {
SListNode * next;
TD_LIST_DIRECTION_T direction;
} SListIter;
#define listHead(l) (l)->head
#define listTail(l) (l)->tail
#define listNEles(l) (l)->numOfEles
#define listEleSize(l) (l)->eleSize
#define isListEmpty(l) ((l)->numOfEles == 0)
#define listNodeFree(n) free(n);
SList * tdListNew(int eleSize);
void tdListFree(SList *list);
void tdListEmpty(SList *list);
void tdListPrependNode(SList *list, SListNode *node);
void tdListAppendNode(SList *list, SListNode *node);
int tdListPrepend(SList *list, void *data);
int tdListAppend(SList *list, void *data);
SListNode *tdListPopHead(SList *list);
SListNode *tdListPopTail(SList *list);
SListNode *tdListPopNode(SList *list, SListNode *node);
void tdListMove(SList *src, SList *dst);
void tdListNodeGetData(SList *list, SListNode *node, void *target);
void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction);
SListNode *tdListNext(SListIter *pIter);
#ifdef __cplusplus
}
#endif
#endif

169
src/util/src/tlist.c Normal file
View File

@ -0,0 +1,169 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <stdlib.h>
#include <string.h>
#include "tlist.h"
SList *tdListNew(int eleSize) {
SList *list = (SList *)malloc(sizeof(SList));
if (list == NULL) return NULL;
list->eleSize = eleSize;
list->numOfEles = 0;
list->head = list->tail = NULL;
return list;
}
void tdListEmpty(SList *list) {
SListNode *node = list->head;
while (node) {
list->head = node->next;
free(node);
node = list->head;
}
list->head = list->tail = 0;
list->numOfEles = 0;
}
void tdListFree(SList *list) {
tdListEmpty(list);
free(list);
}
void tdListPrependNode(SList *list, SListNode *node) {
if (list->head == NULL) {
list->head = node;
list->tail = node;
} else {
node->next = list->head;
node->prev = NULL;
list->head->prev = node;
list->head = node;
}
list->numOfEles++;
}
void tdListAppendNode(SList *list, SListNode *node) {
if (list->head == NULL) {
list->head = node;
list->tail = node;
} else {
node->prev = list->tail;
node->next = NULL;
list->tail->next = node;
list->tail = node;
}
list->numOfEles++;
}
int tdListPrepend(SList *list, void *data) {
SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize);
if (node == NULL) return -1;
memcpy((void *)(node->data), data, list->eleSize);
tdListPrependNode(list, node);
return 0;
}
int tdListAppend(SList *list, void *data) {
SListNode *node = (SListNode *)malloc(sizeof(SListNode) + list->eleSize);
if (node == NULL) return -1;
memcpy((void *)(node->data), data, list->eleSize);
tdListAppendNode(list, node);
return 0;
}
SListNode *tdListPopHead(SList *list) {
if (list->head == NULL) return NULL;
SListNode *node = list->head;
if (node->next == NULL) {
list->head = NULL;
list->tail = NULL;
} else {
list->head = node->next;
}
list->numOfEles--;
return node;
}
SListNode *tdListPopTail(SList *list) {
if (list->tail == NULL) return NULL;
SListNode *node = list->tail;
if (node->prev == NULL) {
list->head = NULL;
list->tail = NULL;
} else {
list->tail = node->prev;
}
list->numOfEles--;
return node;
}
SListNode *tdListPopNode(SList *list, SListNode *node) {
if (list->head == node) {
list->head = node->next;
}
if (list->tail == node) {
list->tail = node->prev;
}
if (node->prev != NULL) {
node->prev->next = node->next;
}
if (node->next != NULL) {
node->next->prev = node->prev;
}
list->numOfEles--;
return node;
}
// Move all node elements from src to dst, the dst is assumed as an empty list
void tdListMove(SList *src, SList *dst) {
// assert(dst->eleSize == src->eleSize);
dst->numOfEles = src->numOfEles;
dst->head = src->head;
dst->tail = src->tail;
src->numOfEles = 0;
src->head = src->tail = NULL;
}
void tdListNodeGetData(SList *list, SListNode *node, void *target) { memcpy(target, node->data, list->eleSize); }
void tdListInitIter(SList *list, SListIter *pIter, TD_LIST_DIRECTION_T direction) {
pIter->direction = direction;
if (direction == TD_LIST_FORWARD) {
pIter->next = list->head;
} else {
pIter->next = list->tail;
}
}
SListNode *tdListNext(SListIter *pIter) {
SListNode *node = pIter->next;
if (node == NULL) return NULL;
if (pIter->direction == TD_LIST_FORWARD) {
pIter->next = node->next;
} else {
pIter->next = node->prev;
}
return node;
}

View File

@ -58,6 +58,7 @@ int32_t tsdbDropRepo(tsdb_repo_t *repo);
tsdb_repo_t * tsdbOpenRepo(char *tsdbDir); tsdb_repo_t * tsdbOpenRepo(char *tsdbDir);
int32_t tsdbCloseRepo(tsdb_repo_t *repo); int32_t tsdbCloseRepo(tsdb_repo_t *repo);
int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg); int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg);
int32_t tsdbTriggerCommit(tsdb_repo_t *repo);
// --------- TSDB TABLE DEFINITION // --------- TSDB TABLE DEFINITION
typedef struct { typedef struct {

View File

@ -17,45 +17,39 @@
#include <stdint.h> #include <stdint.h>
// #include "cache.h" #include "tlist.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16*1024*1024 /* 16M */ #define TSDB_DEFAULT_CACHE_BLOCK_SIZE 16 * 1024 * 1024 /* 16M */
typedef struct { typedef struct {
int64_t skey; // start key int blockId;
int64_t ekey; // end key int offset;
int32_t numOfRows; // numOfRows int remain;
} STableCacheInfo; int padding;
char data[];
} STsdbCacheBlock;
typedef struct _tsdb_cache_block { typedef struct {
char * pData; int64_t index;
STableCacheInfo * pTableInfo; SList * memPool;
struct _tsdb_cache_block *prev; } STsdbCachePool;
struct _tsdb_cache_block *next;
} STSDBCacheBlock;
// Use a doublely linked list to implement this typedef struct {
typedef struct STSDBCache { int maxBytes;
// Number of blocks the cache is allocated int cacheBlockSize;
int32_t numOfBlocks; STsdbCachePool pool;
STSDBCacheBlock *cacheList; STsdbCacheBlock *curBlock;
void * current; SList * mem;
SList * imem;
} STsdbCache; } STsdbCache;
// ---- Operation on STSDBCacheBlock STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize);
#define TSDB_CACHE_BLOCK_DATA(pBlock) ((pBlock)->pData) void tsdbFreeCache(STsdbCache *pCache);
#define TSDB_CACHE_AVAIL_SPACE(pBlock) ((char *)((pBlock)->pTableInfo) - ((pBlock)->pData)) void * tsdbAllocFromCache(STsdbCache *pCache, int bytes);
#define TSDB_TABLE_INFO_OF_CACHE(pBlock, tableId) ((pBlock)->pTableInfo)[tableId]
#define TSDB_NEXT_CACHE_BLOCK(pBlock) ((pBlock)->next)
#define TSDB_PREV_CACHE_BLOCK(pBlock) ((pBlock)->prev)
STsdbCache *tsdbInitCache(int64_t maxSize);
int32_t tsdbFreeCache(STsdbCache *pCache);
void * tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes);
#ifdef __cplusplus #ifdef __cplusplus
} }

View File

@ -24,10 +24,10 @@ extern "C" {
#endif #endif
typedef enum { typedef enum {
TSDB_FILE_TYPE_HEAD, // .head file type TSDB_FILE_TYPE_HEAD = 0, // .head file type
TSDB_FILE_TYPE_DATA, // .data file type TSDB_FILE_TYPE_DATA, // .data file type
TSDB_FILE_TYPE_LAST, // .last file type TSDB_FILE_TYPE_LAST, // .last file type
TSDB_FILE_TYPE_META // .meta file type TSDB_FILE_TYPE_MAX
} TSDB_FILE_TYPE; } TSDB_FILE_TYPE;
extern const char *tsdbFileSuffix[]; extern const char *tsdbFileSuffix[];
@ -38,16 +38,15 @@ typedef struct {
} SFileInfo; } SFileInfo;
typedef struct { typedef struct {
int fd; int8_t type;
int64_t size; // total size of the file char fname[128];
int64_t tombSize; // unused file size int64_t size; // total size of the file
int64_t tombSize; // unused file size
} SFile; } SFile;
typedef struct { typedef struct {
int32_t fileId; int32_t fileId;
SFile fhead; SFile files[TSDB_FILE_TYPE_MAX];
SFile fdata;
SFile flast;
} SFileGroup; } SFileGroup;
// TSDB file handle // TSDB file handle
@ -56,17 +55,17 @@ typedef struct {
int32_t keep; int32_t keep;
int32_t minRowPerFBlock; int32_t minRowPerFBlock;
int32_t maxRowsPerFBlock; int32_t maxRowsPerFBlock;
int32_t maxTables;
SFileGroup fGroup[]; SFileGroup fGroup[];
} STsdbFileH; } STsdbFileH;
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META) #define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) < TSDB_FILE_TYPE_MAX)
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
int32_t maxRowsPerFBlock); int32_t maxRowsPerFBlock, int32_t maxTables);
void tsdbCloseFile(STsdbFileH *pFileH);
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type);
void tsdbCloseFile(STsdbFileH *pFileH);
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif

View File

@ -35,20 +35,21 @@ extern "C" {
// ---------- TSDB TABLE DEFINITION // ---------- TSDB TABLE DEFINITION
typedef struct STable { typedef struct STable {
int8_t type; int8_t type;
STableId tableId; STableId tableId;
int32_t superUid; // Super table UID int32_t superUid; // Super table UID
int32_t sversion; int32_t sversion;
STSchema * schema; STSchema *schema;
STSchema * tagSchema; STSchema *tagSchema;
SDataRow tagVal; SDataRow tagVal;
union { union {
void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data
void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
} content; } content;
void * iData; // Skiplist to commit
void * eventHandler; // TODO void * eventHandler; // TODO
void * streamHandler; // TODO void * streamHandler; // TODO
struct STable *next; // TODO: remove the next struct STable *next; // TODO: remove the next
} STable; } STable;
void * tsdbEncodeTable(STable *pTable, int *contLen); void * tsdbEncodeTable(STable *pTable, int *contLen);

View File

@ -16,22 +16,106 @@
#include "tsdbCache.h" #include "tsdbCache.h"
STsdbCache *tsdbInitCache(int64_t maxSize) { static int tsdbAllocBlockFromPool(STsdbCache *pCache);
STsdbCache *pCacheHandle = (STsdbCache *)malloc(sizeof(STsdbCache)); static void tsdbFreeBlockList(SList *list);
if (pCacheHandle == NULL) {
// TODO : deal with the error STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
return NULL; STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
if (pCache == NULL) return NULL;
if (cacheBlockSize < 0) cacheBlockSize = TSDB_DEFAULT_CACHE_BLOCK_SIZE;
pCache->maxBytes = maxBytes;
pCache->cacheBlockSize = cacheBlockSize;
int nBlocks = maxBytes / cacheBlockSize + 1;
if (nBlocks <= 1) nBlocks = 2;
STsdbCachePool *pPool = &(pCache->pool);
pPool->index = 0;
pPool->memPool = tdListNew(sizeof(STsdbCacheBlock *));
if (pPool->memPool == NULL) goto _err;
for (int i = 0; i < nBlocks; i++) {
STsdbCacheBlock *pBlock = (STsdbCacheBlock *)malloc(sizeof(STsdbCacheBlock) + cacheBlockSize);
if (pBlock == NULL) {
goto _err;
}
pBlock->offset = 0;
pBlock->remain = cacheBlockSize;
tdListAppend(pPool->memPool, (void *)(&pBlock));
} }
return pCacheHandle; pCache->mem = tdListNew(sizeof(STsdbCacheBlock *));
if (pCache->mem == NULL) goto _err;
pCache->imem = tdListNew(sizeof(STsdbCacheBlock *));
if (pCache->imem == NULL) goto _err;
return pCache;
_err:
tsdbFreeCache(pCache);
return NULL;
} }
int32_t tsdbFreeCache(STsdbCache *pHandle) { return 0; } void tsdbFreeCache(STsdbCache *pCache) {
tsdbFreeBlockList(pCache->imem);
tsdbFreeBlockList(pCache->mem);
tsdbFreeBlockList(pCache->pool.memPool);
free(pCache);
}
void *tsdbAllocFromCache(STsdbCache *pCache, int64_t bytes) { void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) {
// TODO: implement here if (pCache == NULL) return NULL;
void *ptr = malloc(bytes); if (bytes > pCache->cacheBlockSize) return NULL;
if (ptr == NULL) return NULL;
if (isListEmpty(pCache->mem)) {
if (tsdbAllocBlockFromPool(pCache) < 0) {
// TODO: deal with the error
}
}
if (pCache->curBlock->remain < bytes) {
if (tsdbAllocBlockFromPool(pCache) < 0) {
// TODO: deal with the error
}
}
void *ptr = (void *)(pCache->curBlock->data + pCache->curBlock->offset);
pCache->curBlock->offset += bytes;
pCache->curBlock->remain -= bytes;
memset(ptr, 0, bytes);
return ptr; return ptr;
} }
static void tsdbFreeBlockList(SList *list) {
if (list == NULL) return;
SListNode * node = NULL;
STsdbCacheBlock *pBlock = NULL;
while ((node = tdListPopHead(list)) != NULL) {
tdListNodeGetData(list, node, (void *)(&pBlock));
free(pBlock);
listNodeFree(node);
}
tdListFree(list);
}
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
STsdbCachePool *pPool = &(pCache->pool);
if (listNEles(pPool->memPool) == 0) return -1;
SListNode *node = tdListPopHead(pPool->memPool);
STsdbCacheBlock *pBlock = NULL;
tdListNodeGetData(pPool->memPool, node, (void *)(&pBlock));
pBlock->blockId = pPool->index++;
pBlock->offset = 0;
pBlock->remain = pCache->cacheBlockSize;
tdListAppendNode(pCache->mem, node);
pCache->curBlock = pBlock;
return 0;
}

View File

@ -12,82 +12,183 @@
* You should have received a copy of the GNU Affero General Public License * You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>. * along with this program. If not, see <http://www.gnu.org/licenses/>.
*/ */
#include <dirent.h>
#include <fcntl.h>
#include <stdint.h>
#include <stdio.h> #include <stdio.h>
#include <stdlib.h> #include <stdlib.h>
#include <stdint.h>
#include <string.h> #include <string.h>
#include <dirent.h> #include <sys/stat.h>
#include <sys/types.h>
#include <unistd.h>
#include "tsdbFile.h"
#include "tglobalcfg.h" #include "tglobalcfg.h"
#include "tsdbFile.h"
// int64_t tsMsPerDay[] = { #define TSDB_FILE_HEAD_SIZE 512
// 86400000L, // TSDB_PRECISION_MILLI #define TSDB_FILE_DELIMITER 0xF00AFA0F
// 86400000000L, // TSDB_PRECISION_MICRO
// 86400000000000L // TSDB_PRECISION_NANO
// };
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile)) #define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3) #define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
typedef struct { typedef struct {
int32_t len;
int32_t padding; // For padding purpose
int64_t offset; int64_t offset;
} SCompHeader;
typedef struct {
int64_t uid;
int64_t last : 1;
int64_t numOfBlocks : 63;
int32_t delimiter;
} SCompInfo;
typedef struct {
TSKEY keyFirst;
TSKEY keyLast;
int32_t numOfBlocks;
int32_t offset;
} SCompIdx; } SCompIdx;
/**
* if numOfSubBlocks == -1, then the SCompBlock is a sub-block
* if numOfSubBlocks == 1, then the SCompBlock refers to the data block, and offset/len refer to
* the data block offset and length
* if numOfSubBlocks > 1, then the offset/len refer to the offset of the first sub-block in the
* binary
*/
typedef struct { typedef struct {
int64_t last : 1; // If the block in data file or last file
int64_t offset : 63; // Offset of data block or sub-block index depending on numOfSubBlocks
int32_t algorithm : 8; // Compression algorithm
int32_t numOfPoints : 24; // Number of total points
int32_t sversion; // Schema version
int32_t len; // Data block length or nothing
int16_t numOfSubBlocks; // Number of sub-blocks;
int16_t numOfCols;
TSKEY keyFirst; TSKEY keyFirst;
TSKEY keyLast; TSKEY keyLast;
int64_t offset;
int32_t len;
int32_t sversion;
} SCompBlock; } SCompBlock;
typedef struct { typedef struct {
int64_t uid; int32_t delimiter; // For recovery usage
} SBlock; int32_t checksum; // TODO: decide if checksum logic in this file or make it one API
int64_t uid;
int32_t padding; // For padding purpose
int32_t numOfBlocks; // TODO: make the struct padding
SCompBlock blocks[];
} SCompInfo;
// TODO: take pre-calculation into account
typedef struct { typedef struct {
int16_t colId; int16_t colId; // Column ID
int16_t bytes; int16_t len; // Column length
int32_t nNullPoints; int32_t type : 8;
int32_t type:8; int32_t offset : 24;
int32_t offset:24; } SCompCol;
int32_t len;
// fields for pre-aggregate // TODO: Take recover into account
// TODO: pre-aggregation should be seperated typedef struct {
int64_t sum; int32_t delimiter; // For recovery usage
int64_t max; int32_t numOfCols; // For recovery usage
int64_t min; int64_t uid; // For recovery usage
int16_t maxIdx; SCompCol cols[];
int16_t minIdx; } SCompData;
} SField;
const char *tsdbFileSuffix[] = { const char *tsdbFileSuffix[] = {
".head", // TSDB_FILE_TYPE_HEAD ".head", // TSDB_FILE_TYPE_HEAD
".data", // TSDB_FILE_TYPE_DATA ".data", // TSDB_FILE_TYPE_DATA
".last", // TSDB_FILE_TYPE_LAST ".last" // TSDB_FILE_TYPE_LAST
".meta" // TSDB_FILE_TYPE_META
}; };
static int tsdbWriteFileHead(int fd, SFile *pFile) {
char head[TSDB_FILE_HEAD_SIZE] = "\0";
pFile->size += TSDB_FILE_HEAD_SIZE;
// TODO: write version and File statistic to the head
lseek(fd, 0, SEEK_SET);
if (write(fd, head, TSDB_FILE_HEAD_SIZE) < 0) return -1;
return 0;
}
static int tsdbWriteHeadFileIdx(int fd, int maxTables, SFile *pFile) {
int size = sizeof(SCompIdx) * maxTables;
void *buf = calloc(1, size);
if (buf == NULL) return -1;
if (lseek(fd, TSDB_FILE_HEAD_SIZE, SEEK_SET) < 0) {
free(buf);
return -1;
}
if (write(fd, buf, size) < 0) {
free(buf);
return -1;
}
pFile->size += size;
return 0;
}
static int tsdbGetFileName(char *dataDir, int fileId, int8_t type, char *fname) {
if (dataDir == NULL || fname == NULL || !IS_VALID_TSDB_FILE_TYPE(type)) return -1;
sprintf(fname, "%s/f%d%s", dataDir, fileId, tsdbFileSuffix[type]);
return 0;
}
/**
* Create a file and set the SFile object
*/
static int tsdbCreateFile(char *dataDir, int fileId, int8_t type, int maxTables, SFile *pFile) {
memset((void *)pFile, 0, sizeof(SFile));
pFile->type = type;
tsdbGetFileName(dataDir, fileId, type, pFile->fname);
if (access(pFile->fname, F_OK) == 0) {
// File already exists
return -1;
}
int fd = open(pFile->fname, O_WRONLY | O_CREAT, 0755);
if (fd < 0) return -1;
if (type == TSDB_FILE_TYPE_HEAD) {
if (tsdbWriteHeadFileIdx(fd, maxTables, pFile) < 0) {
close(fd);
return -1;
}
}
if (tsdbWriteFileHead(fd, pFile) < 0) {
close(fd);
return -1;
}
close(fd);
return 0;
}
static int tsdbRemoveFile(SFile *pFile) {
if (pFile == NULL) return -1;
return remove(pFile->fname);
}
// Create a file group with fileId and return a SFileGroup object
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables) {
if (dataDir == NULL || pFGroup == NULL) return -1;
memset((void *)pFGroup, 0, sizeof(SFileGroup));
for (int type = TSDB_FILE_TYPE_HEAD; type < TSDB_FILE_TYPE_MAX; type++) {
if (tsdbCreateFile(dataDir, fileId, type, maxTables, &(pFGroup->files[type])) < 0) {
// TODO: deal with the error here, remove the created files
return -1;
}
}
pFGroup->fileId = fileId;
return 0;
}
/** /**
* Initialize the TSDB file handle * Initialize the TSDB file handle
*/ */
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock, STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
int32_t maxRowsPerFBlock) { int32_t maxRowsPerFBlock, int32_t maxTables) {
STsdbFileH *pTsdbFileH = STsdbFileH *pTsdbFileH =
(STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile)); (STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile));
if (pTsdbFileH == NULL) return NULL; if (pTsdbFileH == NULL) return NULL;
@ -96,6 +197,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32
pTsdbFileH->keep = keep; pTsdbFileH->keep = keep;
pTsdbFileH->minRowPerFBlock = minRowsPerFBlock; pTsdbFileH->minRowPerFBlock = minRowsPerFBlock;
pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock; pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock;
pTsdbFileH->maxTables = maxTables;
// Open the directory to read information of each file // Open the directory to read information of each file
DIR *dir = opendir(dataDir); DIR *dir = opendir(dataDir);
@ -104,8 +206,9 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32
return NULL; return NULL;
} }
struct dirent *dp;
char fname[256]; char fname[256];
struct dirent *dp;
while ((dp = readdir(dir)) != NULL) { while ((dp = readdir(dir)) != NULL) {
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue; if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
if (true /* check if the file is the .head file */) { if (true /* check if the file is the .head file */) {
@ -125,23 +228,6 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32
return pTsdbFileH; return pTsdbFileH;
} }
/**
* Closet the file handle
*/
void tsdbCloseFile(STsdbFileH *pFileH) {
// TODO
}
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) {
if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL;
char *fileName = (char *)malloc(strlen(dirName) + strlen(fname) + strlen(tsdbFileSuffix[type]) + 5);
if (fileName == NULL) return NULL;
sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]);
return fileName;
}
static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
TSKEY *maxKey) { TSKEY *maxKey) {
*minKey = fileId * daysPerFile * tsMsPerDay[precision]; *minKey = fileId * daysPerFile * tsMsPerDay[precision];

View File

@ -58,13 +58,16 @@ typedef struct _tsdb_repo {
// The cache Handle // The cache Handle
STsdbCache *tsdbCache; STsdbCache *tsdbCache;
// The TSDB file handle
STsdbFileH *tsdbFileH;
// Disk tier handle for multi-tier storage // Disk tier handle for multi-tier storage
void *diskTier; void *diskTier;
// File Store pthread_mutex_t mutex;
void *tsdbFiles;
pthread_mutex_t tsdbMutex; int commit;
pthread_t commitThread;
// A limiter to monitor the resources used by tsdb // A limiter to monitor the resources used by tsdb
void *limiter; void *limiter;
@ -79,6 +82,8 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo);
static int tsdbOpenMetaFile(char *tsdbDir); static int tsdbOpenMetaFile(char *tsdbDir);
static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock); static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock);
static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg); static int32_t tsdbRestoreCfg(STsdbRepo *pRepo, STsdbCfg *pCfg);
static int32_t tsdbGetDataDirName(STsdbRepo *pRepo, char *fname);
static void * tsdbCommitToFile(void *arg);
#define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid] #define TSDB_GET_TABLE_BY_ID(pRepo, sid) (((STSDBRepo *)pRepo)->pTableList)[sid]
#define TSDB_GET_TABLE_BY_NAME(pRepo, name) #define TSDB_GET_TABLE_BY_NAME(pRepo, name)
@ -162,7 +167,7 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
pRepo->tsdbMeta = pMeta; pRepo->tsdbMeta = pMeta;
// Initialize cache // Initialize cache
STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize); STsdbCache *pCache = tsdbInitCache(pCfg->maxCacheSize, -1);
if (pCache == NULL) { if (pCache == NULL) {
free(pRepo->rootDir); free(pRepo->rootDir);
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
@ -171,6 +176,19 @@ tsdb_repo_t *tsdbCreateRepo(char *rootDir, STsdbCfg *pCfg, void *limiter /* TODO
} }
pRepo->tsdbCache = pCache; pRepo->tsdbCache = pCache;
// Initialize file handle
char dataDir[128] = "\0";
tsdbGetDataDirName(pRepo, dataDir);
pRepo->tsdbFileH =
tsdbInitFile(dataDir, pCfg->daysPerFile, pCfg->keep, pCfg->minRowsPerFileBlock, pCfg->maxRowsPerFileBlock, pCfg->maxTables);
if (pRepo->tsdbFileH == NULL) {
free(pRepo->rootDir);
tsdbFreeCache(pRepo->tsdbCache);
tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo);
return NULL;
}
pRepo->state = TSDB_REPO_STATE_ACTIVE; pRepo->state = TSDB_REPO_STATE_ACTIVE;
return (tsdb_repo_t *)pRepo; return (tsdb_repo_t *)pRepo;
@ -230,7 +248,7 @@ tsdb_repo_t *tsdbOpenRepo(char *tsdbDir) {
return NULL; return NULL;
} }
pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize); pRepo->tsdbCache = tsdbInitCache(pRepo->config.maxCacheSize, -1);
if (pRepo->tsdbCache == NULL) { if (pRepo->tsdbCache == NULL) {
tsdbFreeMeta(pRepo->tsdbMeta); tsdbFreeMeta(pRepo->tsdbMeta);
free(pRepo->rootDir); free(pRepo->rootDir);
@ -284,6 +302,32 @@ int32_t tsdbConfigRepo(tsdb_repo_t *repo, STsdbCfg *pCfg) {
return 0; return 0;
} }
int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
STsdbRepo *pRepo = (STsdbRepo *)repo;
if (pthread_mutex_lock(&(pRepo->mutex)) < 0) return -1;
if (pRepo->commit) return 0;
pRepo->commit = 1;
// Loop to move pData to iData
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pRepo->tsdbMeta->tables[i];
if (pTable != NULL) {
void *pData = pTable->content.pData;
pTable->content.pData = NULL;
pTable->iData = pData;
}
}
// Loop to move mem to imem
tdListMove(pRepo->tsdbCache->mem, pRepo->tsdbCache->imem);
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
pthread_mutex_unlock(&(pRepo->mutex));
pthread_join(pRepo->commitThread, NULL);
return 0;
}
/** /**
* Get the TSDB repository information, including some statistics * Get the TSDB repository information, including some statistics
* @param pRepo the TSDB repository handle * @param pRepo the TSDB repository handle
@ -612,9 +656,6 @@ static int32_t tsdbDestroyRepoEnv(STsdbRepo *pRepo) {
rmdir(dirName); rmdir(dirName);
char *metaFname = tsdbGetFileName(pRepo->rootDir, "tsdb", TSDB_FILE_TYPE_META);
remove(metaFname);
return 0; return 0;
} }
@ -663,3 +704,22 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
return 0; return 0;
} }
static void *tsdbCommitToFile(void *arg) {
// TODO
STsdbRepo *pRepo = (STsdbRepo *)arg;
STsdbMeta *pMeta = pRepo->tsdbMeta;
for (int i = 0; i < pRepo->config.maxTables; i++) {
STable *pTable = pMeta->tables[i];
if (pTable == NULL) continue;
SSkipListIterator *pIter = tSkipListCreateIter(pTable->iData);
while (tSkipListIterNext(pIter)) {
SSkipListNode *node = tSkipListIterGet(pIter);
SDataRow row = SL_GET_NODE_DATA(node);
int k = 0;
}
}
return NULL;
}

View File

@ -3,6 +3,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "dataformat.h" #include "dataformat.h"
#include "tsdbFile.h"
#include "tsdbMeta.h" #include "tsdbMeta.h"
TEST(TsdbTest, tableEncodeDecode) { TEST(TsdbTest, tableEncodeDecode) {
@ -71,39 +72,50 @@ TEST(TsdbTest, createRepo) {
tsdbCreateTable(pRepo, &tCfg); tsdbCreateTable(pRepo, &tCfg);
// // 3. Loop to write some simple data // // 3. Loop to write some simple data
int nRows = 10; int nRows = 100;
SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * nRows); int rowsPerSubmit = 10;
SSubmitBlk *pBlock = pMsg->blocks;
pBlock->tableId = {.uid = 987607499877672L, .tid = 0};
pBlock->sversion = 0;
pBlock->len = 0;
int64_t start_time = 1584081000000; int64_t start_time = 1584081000000;
for (int i = 0; i < nRows; i++) {
int64_t ttime = start_time + 1000 * i;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, schema);
for (int j = 0; j < schemaNCols(schema); j++) { SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * rowsPerSubmit);
if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&ttime), schemaColAt(schema, j)); for (int k = 0; k < nRows/rowsPerSubmit; k++) {
} else { // For int SSubmitBlk *pBlock = pMsg->blocks;
int val = 10; pBlock->tableId = {.uid = 987607499877672L, .tid = 0};
tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j)); pBlock->sversion = 0;
pBlock->len = 0;
for (int i = 0; i < rowsPerSubmit; i++) {
start_time += 1000;
SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
tdInitDataRow(row, schema);
for (int j = 0; j < schemaNCols(schema); j++) {
if (j == 0) { // Just for timestamp
tdAppendColVal(row, (void *)(&start_time), schemaColAt(schema, j));
} else { // For int
int val = 10;
tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
}
} }
pBlock->len += dataRowLen(row);
} }
pBlock->len += dataRowLen(row); pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
tsdbInsertData(pRepo, pMsg);
} }
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
tsdbInsertData(pRepo, pMsg); tsdbTriggerCommit(pRepo);
int k = 0;
} }
TEST(TsdbTest, openRepo) { TEST(TsdbTest, openRepo) {
tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0"); tsdb_repo_t *pRepo = tsdbOpenRepo("/home/ubuntu/work/ttest/vnode0");
ASSERT_NE(pRepo, nullptr); ASSERT_NE(pRepo, nullptr);
} }
TEST(TsdbTest, createFileGroup) {
SFileGroup fGroup;
ASSERT_EQ(tsdbCreateFileGroup("/home/ubuntu/work/ttest/vnode0/data", 1820, &fGroup, 1000), 0);
int k = 0;
}