commit
8d67c82b72
|
@ -81,11 +81,13 @@ STSchema *tdDecodeSchema(void **psrc);
|
||||||
*/
|
*/
|
||||||
typedef void *SDataRow;
|
typedef void *SDataRow;
|
||||||
|
|
||||||
|
|
||||||
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
|
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
|
||||||
|
|
||||||
#define dataRowLen(r) (*(int32_t *)(r))
|
#define dataRowLen(r) (*(int32_t *)(r))
|
||||||
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
|
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
|
||||||
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
|
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
|
||||||
|
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
|
||||||
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
|
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
|
||||||
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
|
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
|
||||||
#define dataRowIdx(r, i) ((char *)(r) + i)
|
#define dataRowIdx(r, i) ((char *)(r) + i)
|
||||||
|
|
|
@ -17,6 +17,7 @@
|
||||||
|
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
|
#include "taosdef.h"
|
||||||
#include "tlist.h"
|
#include "tlist.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
@ -38,18 +39,25 @@ typedef struct {
|
||||||
SList * memPool;
|
SList * memPool;
|
||||||
} STsdbCachePool;
|
} STsdbCachePool;
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
int64_t numOfPoints;
|
||||||
|
SList * list;
|
||||||
|
} SCacheMem;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int maxBytes;
|
int maxBytes;
|
||||||
int cacheBlockSize;
|
int cacheBlockSize;
|
||||||
STsdbCachePool pool;
|
STsdbCachePool pool;
|
||||||
STsdbCacheBlock *curBlock;
|
STsdbCacheBlock *curBlock;
|
||||||
SList * mem;
|
SCacheMem * mem;
|
||||||
SList * imem;
|
SCacheMem * imem;
|
||||||
} STsdbCache;
|
} STsdbCache;
|
||||||
|
|
||||||
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize);
|
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize);
|
||||||
void tsdbFreeCache(STsdbCache *pCache);
|
void tsdbFreeCache(STsdbCache *pCache);
|
||||||
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes);
|
void * tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,11 +18,15 @@
|
||||||
#include <stdint.h>
|
#include <stdint.h>
|
||||||
|
|
||||||
#include "taosdef.h"
|
#include "taosdef.h"
|
||||||
|
#include "tglobalcfg.h"
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
|
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||||
|
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FILE_TYPE_HEAD = 0, // .head file type
|
TSDB_FILE_TYPE_HEAD = 0, // .head file type
|
||||||
TSDB_FILE_TYPE_DATA, // .data file type
|
TSDB_FILE_TYPE_DATA, // .data file type
|
||||||
|
@ -66,6 +70,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32
|
||||||
|
|
||||||
void tsdbCloseFile(STsdbFileH *pFileH);
|
void tsdbCloseFile(STsdbFileH *pFileH);
|
||||||
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables);
|
int tsdbCreateFileGroup(char *dataDir, int fileId, SFileGroup *pFGroup, int maxTables);
|
||||||
|
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey, TSKEY *maxKey);
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
#endif
|
#endif
|
||||||
|
|
|
@ -33,20 +33,25 @@ extern "C" {
|
||||||
|
|
||||||
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
|
#define IS_CREATE_STABLE(pCfg) ((pCfg)->tagValues != NULL)
|
||||||
|
|
||||||
|
typedef struct {
|
||||||
|
TSKEY keyFirst;
|
||||||
|
TSKEY keyLast;
|
||||||
|
int32_t numOfPoints;
|
||||||
|
void * pData;
|
||||||
|
} SMemTable;
|
||||||
|
|
||||||
// ---------- TSDB TABLE DEFINITION
|
// ---------- TSDB TABLE DEFINITION
|
||||||
typedef struct STable {
|
typedef struct STable {
|
||||||
int8_t type;
|
int8_t type;
|
||||||
STableId tableId;
|
STableId tableId;
|
||||||
int32_t superUid; // Super table UID
|
int32_t superUid; // Super table UID
|
||||||
int32_t sversion;
|
int32_t sversion;
|
||||||
STSchema *schema;
|
STSchema * schema;
|
||||||
STSchema *tagSchema;
|
STSchema * tagSchema;
|
||||||
SDataRow tagVal;
|
SDataRow tagVal;
|
||||||
union {
|
SMemTable * mem;
|
||||||
void *pData; // For TSDB_NORMAL_TABLE and TSDB_CHILD_TABLE, it is the skiplist for cache data
|
SMemTable * imem;
|
||||||
void *pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index
|
||||||
} content;
|
|
||||||
void * iData; // Skiplist to commit
|
|
||||||
void * eventHandler; // TODO
|
void * eventHandler; // TODO
|
||||||
void * streamHandler; // TODO
|
void * streamHandler; // TODO
|
||||||
struct STable *next; // TODO: remove the next
|
struct STable *next; // TODO: remove the next
|
||||||
|
@ -69,6 +74,8 @@ typedef struct {
|
||||||
void *map; // table map of (uid ===> table)
|
void *map; // table map of (uid ===> table)
|
||||||
|
|
||||||
SMetaFile *mfh; // meta file handle
|
SMetaFile *mfh; // meta file handle
|
||||||
|
int maxRowBytes;
|
||||||
|
int maxCols;
|
||||||
} STsdbMeta;
|
} STsdbMeta;
|
||||||
|
|
||||||
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables);
|
STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables);
|
||||||
|
@ -94,8 +101,9 @@ int32_t tsdbFreeMeta(STsdbMeta *pMeta);
|
||||||
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
|
int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg);
|
||||||
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
|
int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId);
|
||||||
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
|
STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId);
|
||||||
int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
|
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable);
|
||||||
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
|
STable *tsdbGetTableByUid(STsdbMeta *pMeta, int64_t uid);
|
||||||
|
char *getTupleKey(const void * data);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@
|
||||||
#include "tsdbCache.h"
|
#include "tsdbCache.h"
|
||||||
|
|
||||||
static int tsdbAllocBlockFromPool(STsdbCache *pCache);
|
static int tsdbAllocBlockFromPool(STsdbCache *pCache);
|
||||||
static void tsdbFreeBlockList(SList *list);
|
static void tsdbFreeBlockList(SCacheMem *mem);
|
||||||
|
|
||||||
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
|
STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
|
||||||
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
|
STsdbCache *pCache = (STsdbCache *)calloc(1, sizeof(STsdbCache));
|
||||||
|
@ -46,11 +46,8 @@ STsdbCache *tsdbInitCache(int maxBytes, int cacheBlockSize) {
|
||||||
tdListAppend(pPool->memPool, (void *)(&pBlock));
|
tdListAppend(pPool->memPool, (void *)(&pBlock));
|
||||||
}
|
}
|
||||||
|
|
||||||
pCache->mem = tdListNew(sizeof(STsdbCacheBlock *));
|
pCache->mem = NULL;
|
||||||
if (pCache->mem == NULL) goto _err;
|
pCache->imem = NULL;
|
||||||
|
|
||||||
pCache->imem = tdListNew(sizeof(STsdbCacheBlock *));
|
|
||||||
if (pCache->imem == NULL) goto _err;
|
|
||||||
|
|
||||||
return pCache;
|
return pCache;
|
||||||
|
|
||||||
|
@ -66,11 +63,20 @@ void tsdbFreeCache(STsdbCache *pCache) {
|
||||||
free(pCache);
|
free(pCache);
|
||||||
}
|
}
|
||||||
|
|
||||||
void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) {
|
void *tsdbAllocFromCache(STsdbCache *pCache, int bytes, TSKEY key) {
|
||||||
if (pCache == NULL) return NULL;
|
if (pCache == NULL) return NULL;
|
||||||
if (bytes > pCache->cacheBlockSize) return NULL;
|
if (bytes > pCache->cacheBlockSize) return NULL;
|
||||||
|
|
||||||
if (isListEmpty(pCache->mem)) {
|
if (pCache->mem == NULL) { // Create a new one
|
||||||
|
pCache->mem = (SCacheMem *)malloc(sizeof(SCacheMem));
|
||||||
|
if (pCache->mem == NULL) return NULL;
|
||||||
|
pCache->mem->keyFirst = INT64_MAX;
|
||||||
|
pCache->mem->keyLast = 0;
|
||||||
|
pCache->mem->numOfPoints = 0;
|
||||||
|
pCache->mem->list = tdListNew(sizeof(STsdbCacheBlock *));
|
||||||
|
}
|
||||||
|
|
||||||
|
if (isListEmpty(pCache->mem->list)) {
|
||||||
if (tsdbAllocBlockFromPool(pCache) < 0) {
|
if (tsdbAllocBlockFromPool(pCache) < 0) {
|
||||||
// TODO: deal with the error
|
// TODO: deal with the error
|
||||||
}
|
}
|
||||||
|
@ -86,12 +92,16 @@ void *tsdbAllocFromCache(STsdbCache *pCache, int bytes) {
|
||||||
pCache->curBlock->offset += bytes;
|
pCache->curBlock->offset += bytes;
|
||||||
pCache->curBlock->remain -= bytes;
|
pCache->curBlock->remain -= bytes;
|
||||||
memset(ptr, 0, bytes);
|
memset(ptr, 0, bytes);
|
||||||
|
if (key < pCache->mem->keyFirst) pCache->mem->keyFirst = key;
|
||||||
|
if (key > pCache->mem->keyLast) pCache->mem->keyLast = key;
|
||||||
|
pCache->mem->numOfPoints++;
|
||||||
|
|
||||||
return ptr;
|
return ptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbFreeBlockList(SList *list) {
|
static void tsdbFreeBlockList(SCacheMem *mem) {
|
||||||
if (list == NULL) return;
|
if (mem == NULL) return;
|
||||||
|
SList * list = mem->list;
|
||||||
SListNode * node = NULL;
|
SListNode * node = NULL;
|
||||||
STsdbCacheBlock *pBlock = NULL;
|
STsdbCacheBlock *pBlock = NULL;
|
||||||
while ((node = tdListPopHead(list)) != NULL) {
|
while ((node = tdListPopHead(list)) != NULL) {
|
||||||
|
@ -100,6 +110,7 @@ static void tsdbFreeBlockList(SList *list) {
|
||||||
listNodeFree(node);
|
listNodeFree(node);
|
||||||
}
|
}
|
||||||
tdListFree(list);
|
tdListFree(list);
|
||||||
|
free(mem);
|
||||||
}
|
}
|
||||||
|
|
||||||
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
||||||
|
@ -114,7 +125,7 @@ static int tsdbAllocBlockFromPool(STsdbCache *pCache) {
|
||||||
pBlock->offset = 0;
|
pBlock->offset = 0;
|
||||||
pBlock->remain = pCache->cacheBlockSize;
|
pBlock->remain = pCache->cacheBlockSize;
|
||||||
|
|
||||||
tdListAppendNode(pCache->mem, node);
|
tdListAppendNode(pCache->mem->list, node);
|
||||||
pCache->curBlock = pBlock;
|
pCache->curBlock = pBlock;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
|
|
@ -22,15 +22,11 @@
|
||||||
#include <sys/types.h>
|
#include <sys/types.h>
|
||||||
#include <unistd.h>
|
#include <unistd.h>
|
||||||
|
|
||||||
#include "tglobalcfg.h"
|
|
||||||
#include "tsdbFile.h"
|
#include "tsdbFile.h"
|
||||||
|
|
||||||
#define TSDB_FILE_HEAD_SIZE 512
|
#define TSDB_FILE_HEAD_SIZE 512
|
||||||
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
#define TSDB_FILE_DELIMITER 0xF00AFA0F
|
||||||
|
|
||||||
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
|
||||||
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
|
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int32_t len;
|
int32_t len;
|
||||||
int32_t padding; // For padding purpose
|
int32_t padding; // For padding purpose
|
||||||
|
@ -228,7 +224,7 @@ STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32
|
||||||
return pTsdbFileH;
|
return pTsdbFileH;
|
||||||
}
|
}
|
||||||
|
|
||||||
static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
|
void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
|
||||||
TSKEY *maxKey) {
|
TSKEY *maxKey) {
|
||||||
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
||||||
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
||||||
|
|
|
@ -44,6 +44,7 @@
|
||||||
|
|
||||||
#define TSDB_CFG_FILE_NAME "CONFIG"
|
#define TSDB_CFG_FILE_NAME "CONFIG"
|
||||||
#define TSDB_DATA_DIR_NAME "data"
|
#define TSDB_DATA_DIR_NAME "data"
|
||||||
|
#define TSDB_DEFAULT_FILE_BLOCK_ROW_OPTION 0.7
|
||||||
|
|
||||||
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
|
enum { TSDB_REPO_STATE_ACTIVE, TSDB_REPO_STATE_CLOSED, TSDB_REPO_STATE_CONFIGURING };
|
||||||
|
|
||||||
|
@ -311,14 +312,14 @@ int32_t tsdbTriggerCommit(tsdb_repo_t *repo) {
|
||||||
// Loop to move pData to iData
|
// Loop to move pData to iData
|
||||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
||||||
STable *pTable = pRepo->tsdbMeta->tables[i];
|
STable *pTable = pRepo->tsdbMeta->tables[i];
|
||||||
if (pTable != NULL) {
|
if (pTable != NULL && pTable->mem != NULL) {
|
||||||
void *pData = pTable->content.pData;
|
pTable->imem = pTable->mem;
|
||||||
pTable->content.pData = NULL;
|
pTable->mem = NULL;
|
||||||
pTable->iData = pData;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// Loop to move mem to imem
|
// TODO: Loop to move mem to imem
|
||||||
tdListMove(pRepo->tsdbCache->mem, pRepo->tsdbCache->imem);
|
pRepo->tsdbCache->imem = pRepo->tsdbCache->mem;
|
||||||
|
pRepo->tsdbCache->mem = NULL;
|
||||||
|
|
||||||
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
|
pthread_create(&(pRepo->commitThread), NULL, tsdbCommitToFile, (void *)repo);
|
||||||
pthread_mutex_unlock(&(pRepo->mutex));
|
pthread_mutex_unlock(&(pRepo->mutex));
|
||||||
|
@ -669,10 +670,19 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
||||||
int32_t level = 0;
|
int32_t level = 0;
|
||||||
int32_t headSize = 0;
|
int32_t headSize = 0;
|
||||||
|
|
||||||
tSkipListRandNodeInfo(pTable->content.pData, &level, &headSize);
|
if (pTable->mem == NULL) {
|
||||||
|
pTable->mem = (SMemTable *)calloc(1, sizeof(SMemTable));
|
||||||
|
if (pTable->mem == NULL) return -1;
|
||||||
|
pTable->mem->pData = tSkipListCreate(5, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
||||||
|
pTable->mem->keyFirst = INT64_MAX;
|
||||||
|
pTable->mem->keyLast = 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
tSkipListRandNodeInfo(pTable->mem->pData, &level, &headSize);
|
||||||
|
|
||||||
|
TSKEY key = dataRowKey(row);
|
||||||
// Copy row into the memory
|
// Copy row into the memory
|
||||||
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row));
|
SSkipListNode *pNode = tsdbAllocFromCache(pRepo->tsdbCache, headSize + dataRowLen(row), key);
|
||||||
if (pNode == NULL) {
|
if (pNode == NULL) {
|
||||||
// TODO: deal with allocate failure
|
// TODO: deal with allocate failure
|
||||||
}
|
}
|
||||||
|
@ -681,7 +691,10 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
||||||
dataRowCpy(SL_GET_NODE_DATA(pNode), row);
|
dataRowCpy(SL_GET_NODE_DATA(pNode), row);
|
||||||
|
|
||||||
// Insert the skiplist node into the data
|
// Insert the skiplist node into the data
|
||||||
tsdbInsertRowToTableImpl(pNode, pTable);
|
tSkipListPut(pTable->mem->pData, pNode);
|
||||||
|
if (key > pTable->mem->keyLast) pTable->mem->keyLast = key;
|
||||||
|
if (key < pTable->mem->keyFirst) pTable->mem->keyFirst = key;
|
||||||
|
pTable->mem->numOfPoints++;
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -705,21 +718,60 @@ static int32_t tsdbInsertDataToTable(tsdb_repo_t *repo, SSubmitBlk *pBlock) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static int tsdbReadRowsFromCache(SSkipListIterator *pIter, TSKEY minKey, TSKEY maxKey, int maxRowsToRead, void *dst) {
|
||||||
|
int numOfRows = 0;
|
||||||
|
do {
|
||||||
|
SSkipListNode *node = tSkiplistIterGet(pIter);
|
||||||
|
SDataRow row = SL_GET_NODE_DATA(node);
|
||||||
|
if (dataRowKey(row) > maxKey) break;
|
||||||
|
} while (tSkipListIterNext(pIter));
|
||||||
|
return numOfRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Commit to file
|
||||||
static void *tsdbCommitToFile(void *arg) {
|
static void *tsdbCommitToFile(void *arg) {
|
||||||
// TODO
|
// TODO
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)arg;
|
STsdbRepo * pRepo = (STsdbRepo *)arg;
|
||||||
STsdbMeta *pMeta = pRepo->tsdbMeta;
|
STsdbMeta * pMeta = pRepo->tsdbMeta;
|
||||||
for (int i = 0; i < pRepo->config.maxTables; i++) {
|
STsdbCache *pCache = pRepo->tsdbCache;
|
||||||
STable *pTable = pMeta->tables[i];
|
STsdbRepo * pCfg = &(pRepo->config);
|
||||||
if (pTable == NULL) continue;
|
if (pCache->imem == NULL) return;
|
||||||
SSkipListIterator *pIter = tSkipListCreateIter(pTable->iData);
|
|
||||||
while (tSkipListIterNext(pIter)) {
|
|
||||||
SSkipListNode *node = tSkipListIterGet(pIter);
|
|
||||||
SDataRow row = SL_GET_NODE_DATA(node);
|
|
||||||
int k = 0;
|
|
||||||
|
|
||||||
|
int sfid = tsdbGetKeyFileId(pCache->imem->keyFirst);
|
||||||
|
int efid = tsdbGetKeyFileId(pCache->imem->keyLast);
|
||||||
|
SSkipListIterator **iters = (SSkipListIterator **)calloc(pCfg->maxTables, sizeof(SSkipListIterator *));
|
||||||
|
if (iters == NULL) {
|
||||||
|
// TODO: deal with the error
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
for (int fid = sfid; fid <= efid; fid++) {
|
||||||
|
TSKEY minKey = 0, maxKey = 0;
|
||||||
|
tsdbGetKeyRangeOfFileId(pCfg->daysPerFile, pCfg->precision, fid, &minKey, &maxKey);
|
||||||
|
|
||||||
|
for (int tid = 0; tid < pCfg->maxTables; tid++) {
|
||||||
|
STable *pTable = pMeta->tables[tid];
|
||||||
|
if (pTable == NULL || pTable->imem == NULL) continue;
|
||||||
|
if (iters[tid] == NULL) { // create table iterator
|
||||||
|
iters[tid] = tSkipListCreateIter(pTable->imem);
|
||||||
|
// TODO: deal with the error
|
||||||
|
if (iters[tid] == NULL) break;
|
||||||
|
if (!tSkipListIterNext(iters[tid])) {
|
||||||
|
// assert(0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Loop the iterator
|
||||||
|
// tsdbReadRowsFromCache();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Free the iterator
|
||||||
|
for (int tid = 0; tid < pCfg->maxTables; tid++) {
|
||||||
|
if (iters[tid] != NULL) tSkipListDestroyIter(iters[tid]);
|
||||||
|
}
|
||||||
|
|
||||||
|
free(iters);
|
||||||
|
|
||||||
return NULL;
|
return NULL;
|
||||||
}
|
}
|
|
@ -18,7 +18,6 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
||||||
static char * getTupleKey(const void *data);
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encode a TSDB table object as a binary content
|
* Encode a TSDB table object as a binary content
|
||||||
|
@ -102,11 +101,8 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
|
||||||
if (pTable == NULL) return -1;
|
if (pTable == NULL) return -1;
|
||||||
|
|
||||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||||
pTable->content.pIndex =
|
pTable->pIndex =
|
||||||
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey);
|
tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1, 0, getTupleKey);
|
||||||
} else {
|
|
||||||
pTable->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP,
|
|
||||||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
tsdbAddTableToMeta(pMeta, pTable, false);
|
tsdbAddTableToMeta(pMeta, pTable, false);
|
||||||
|
@ -137,6 +133,8 @@ STsdbMeta *tsdbInitMeta(const char *rootDir, int32_t maxTables) {
|
||||||
pMeta->nTables = 0;
|
pMeta->nTables = 0;
|
||||||
pMeta->superList = NULL;
|
pMeta->superList = NULL;
|
||||||
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
|
pMeta->tables = (STable **)calloc(maxTables, sizeof(STable *));
|
||||||
|
pMeta->maxRowBytes = 0;
|
||||||
|
pMeta->maxCols = 0;
|
||||||
if (pMeta->tables == NULL) {
|
if (pMeta->tables == NULL) {
|
||||||
free(pMeta);
|
free(pMeta);
|
||||||
return NULL;
|
return NULL;
|
||||||
|
@ -208,10 +206,10 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
||||||
super->schema = tdDupSchema(pCfg->schema);
|
super->schema = tdDupSchema(pCfg->schema);
|
||||||
super->tagSchema = tdDupSchema(pCfg->tagSchema);
|
super->tagSchema = tdDupSchema(pCfg->tagSchema);
|
||||||
super->tagVal = tdDataRowDup(pCfg->tagValues);
|
super->tagVal = tdDataRowDup(pCfg->tagValues);
|
||||||
super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
|
super->pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
|
||||||
0, getTupleKey); // Allow duplicate key, no lock
|
0, getTupleKey); // Allow duplicate key, no lock
|
||||||
|
|
||||||
if (super->content.pIndex == NULL) {
|
if (super->pIndex == NULL) {
|
||||||
tdFreeSchema(super->schema);
|
tdFreeSchema(super->schema);
|
||||||
tdFreeSchema(super->tagSchema);
|
tdFreeSchema(super->tagSchema);
|
||||||
tdFreeDataRow(super->tagVal);
|
tdFreeDataRow(super->tagVal);
|
||||||
|
@ -223,7 +221,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
STable *table = (STable *)malloc(sizeof(STable));
|
STable *table = (STable *)calloc(1, sizeof(STable));
|
||||||
if (table == NULL) {
|
if (table == NULL) {
|
||||||
if (newSuper) tsdbFreeTable(super);
|
if (newSuper) tsdbFreeTable(super);
|
||||||
return -1;
|
return -1;
|
||||||
|
@ -239,7 +237,6 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
||||||
table->superUid = -1;
|
table->superUid = -1;
|
||||||
table->schema = tdDupSchema(pCfg->schema);
|
table->schema = tdDupSchema(pCfg->schema);
|
||||||
}
|
}
|
||||||
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
|
||||||
|
|
||||||
// Register to meta
|
// Register to meta
|
||||||
if (newSuper) tsdbAddTableToMeta(pMeta, super, true);
|
if (newSuper) tsdbAddTableToMeta(pMeta, super, true);
|
||||||
|
@ -299,10 +296,10 @@ int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
|
// int32_t tsdbInsertRowToTableImpl(SSkipListNode *pNode, STable *pTable) {
|
||||||
tSkipListPut(pTable->content.pData, pNode);
|
// tSkipListPut(pTable->mem->pData, pNode);
|
||||||
return 0;
|
// return 0;
|
||||||
}
|
// }
|
||||||
|
|
||||||
static int tsdbFreeTable(STable *pTable) {
|
static int tsdbFreeTable(STable *pTable) {
|
||||||
// TODO: finish this function
|
// TODO: finish this function
|
||||||
|
@ -314,9 +311,7 @@ static int tsdbFreeTable(STable *pTable) {
|
||||||
|
|
||||||
// Free content
|
// Free content
|
||||||
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
|
if (TSDB_TABLE_IS_SUPER_TABLE(pTable)) {
|
||||||
tSkipListDestroy(pTable->content.pIndex);
|
tSkipListDestroy(pTable->pIndex);
|
||||||
} else {
|
|
||||||
tSkipListDestroy(pTable->content.pData);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
free(pTable);
|
free(pTable);
|
||||||
|
@ -404,7 +399,7 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) {
|
||||||
return size;
|
return size;
|
||||||
}
|
}
|
||||||
|
|
||||||
static char *getTupleKey(const void * data) {
|
char *getTupleKey(const void * data) {
|
||||||
SDataRow row = (SDataRow)data;
|
SDataRow row = (SDataRow)data;
|
||||||
|
|
||||||
return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
|
return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
|
||||||
|
|
|
@ -40,7 +40,6 @@ TEST(TsdbTest, tableEncodeDecode) {
|
||||||
ASSERT_EQ(pTable->superUid, tTable->superUid);
|
ASSERT_EQ(pTable->superUid, tTable->superUid);
|
||||||
ASSERT_EQ(pTable->sversion, tTable->sversion);
|
ASSERT_EQ(pTable->sversion, tTable->sversion);
|
||||||
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
|
ASSERT_EQ(memcmp(pTable->schema, tTable->schema, sizeof(STSchema) + sizeof(STColumn) * nCols), 0);
|
||||||
ASSERT_EQ(tTable->content.pData, nullptr);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST(TsdbTest, createRepo) {
|
TEST(TsdbTest, createRepo) {
|
||||||
|
@ -72,7 +71,7 @@ TEST(TsdbTest, createRepo) {
|
||||||
tsdbCreateTable(pRepo, &tCfg);
|
tsdbCreateTable(pRepo, &tCfg);
|
||||||
|
|
||||||
// // 3. Loop to write some simple data
|
// // 3. Loop to write some simple data
|
||||||
int nRows = 100;
|
int nRows = 1000;
|
||||||
int rowsPerSubmit = 10;
|
int rowsPerSubmit = 10;
|
||||||
int64_t start_time = 1584081000000;
|
int64_t start_time = 1584081000000;
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue