commit
e54e1bae8c
|
@ -253,6 +253,7 @@ static __compar_fn_t getKeyComparator(int32_t keyType) {
|
||||||
comparFn = compareInt32Val;
|
comparFn = compareInt32Val;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BIGINT:
|
case TSDB_DATA_TYPE_BIGINT:
|
||||||
|
case TSDB_DATA_TYPE_TIMESTAMP:
|
||||||
comparFn = compareInt64Val;
|
comparFn = compareInt64Val;
|
||||||
break;
|
break;
|
||||||
case TSDB_DATA_TYPE_BOOL:
|
case TSDB_DATA_TYPE_BOOL:
|
||||||
|
|
|
@ -75,6 +75,7 @@ typedef struct {
|
||||||
typedef struct {
|
typedef struct {
|
||||||
TSDB_TABLE_TYPE type;
|
TSDB_TABLE_TYPE type;
|
||||||
STableId tableId;
|
STableId tableId;
|
||||||
|
int32_t sversion;
|
||||||
int64_t superUid;
|
int64_t superUid;
|
||||||
STSchema * schema;
|
STSchema * schema;
|
||||||
STSchema * tagSchema;
|
STSchema * tagSchema;
|
||||||
|
|
|
@ -23,8 +23,6 @@
|
||||||
extern "C" {
|
extern "C" {
|
||||||
#endif
|
#endif
|
||||||
|
|
||||||
typedef int32_t file_id_t;
|
|
||||||
|
|
||||||
typedef enum {
|
typedef enum {
|
||||||
TSDB_FILE_TYPE_HEAD, // .head file type
|
TSDB_FILE_TYPE_HEAD, // .head file type
|
||||||
TSDB_FILE_TYPE_DATA, // .data file type
|
TSDB_FILE_TYPE_DATA, // .data file type
|
||||||
|
@ -40,19 +38,33 @@ typedef struct {
|
||||||
} SFileInfo;
|
} SFileInfo;
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
char * fname;
|
int fd;
|
||||||
SFileInfo fInfo;
|
int64_t size; // total size of the file
|
||||||
} SFILE;
|
int64_t tombSize; // unused file size
|
||||||
|
} SFile;
|
||||||
|
|
||||||
// typedef struct {
|
typedef struct {
|
||||||
// int64_t offset;
|
int32_t fileId;
|
||||||
// int64_t skey;
|
SFile fhead;
|
||||||
// int64_t ekey;
|
SFile fdata;
|
||||||
// int16_t numOfBlocks;
|
SFile flast;
|
||||||
// } SDataBlock;
|
} SFileGroup;
|
||||||
|
|
||||||
|
// TSDB file handle
|
||||||
|
typedef struct {
|
||||||
|
int32_t daysPerFile;
|
||||||
|
int32_t keep;
|
||||||
|
int32_t minRowPerFBlock;
|
||||||
|
int32_t maxRowsPerFBlock;
|
||||||
|
SFileGroup fGroup[];
|
||||||
|
} STsdbFileH;
|
||||||
|
|
||||||
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META)
|
#define IS_VALID_TSDB_FILE_TYPE(type) ((type) >= TSDB_FILE_TYPE_HEAD && (type) <= TSDB_FILE_TYPE_META)
|
||||||
|
|
||||||
|
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
|
||||||
|
int32_t maxRowsPerFBlock);
|
||||||
|
void tsdbCloseFile(STsdbFileH *pFileH);
|
||||||
|
|
||||||
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type);
|
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type);
|
||||||
|
|
||||||
#ifdef __cplusplus
|
#ifdef __cplusplus
|
||||||
|
|
|
@ -38,6 +38,7 @@ typedef struct STable {
|
||||||
TSDB_TABLE_TYPE type;
|
TSDB_TABLE_TYPE type;
|
||||||
STableId tableId;
|
STableId tableId;
|
||||||
int32_t superUid; // Super table UID
|
int32_t superUid; // Super table UID
|
||||||
|
int32_t sversion;
|
||||||
STSchema * schema;
|
STSchema * schema;
|
||||||
STSchema * tagSchema;
|
STSchema * tagSchema;
|
||||||
SDataRow tagVal;
|
SDataRow tagVal;
|
||||||
|
|
|
@ -14,9 +14,21 @@
|
||||||
*/
|
*/
|
||||||
#include <stdio.h>
|
#include <stdio.h>
|
||||||
#include <stdlib.h>
|
#include <stdlib.h>
|
||||||
|
#include <stdint.h>
|
||||||
#include <string.h>
|
#include <string.h>
|
||||||
|
#include <dirent.h>
|
||||||
|
|
||||||
#include "tsdbFile.h"
|
#include "tsdbFile.h"
|
||||||
|
#include "tglobalcfg.h"
|
||||||
|
|
||||||
|
// int64_t tsMsPerDay[] = {
|
||||||
|
// 86400000L, // TSDB_PRECISION_MILLI
|
||||||
|
// 86400000000L, // TSDB_PRECISION_MICRO
|
||||||
|
// 86400000000000L // TSDB_PRECISION_NANO
|
||||||
|
// };
|
||||||
|
|
||||||
|
#define tsdbGetKeyFileId(key, daysPerFile, precision) ((key) / tsMsPerDay[(precision)] / (daysPerFile))
|
||||||
|
#define tsdbGetMaxNumOfFiles(keep, daysPerFile) ((keep) / (daysPerFile) + 3)
|
||||||
|
|
||||||
typedef struct {
|
typedef struct {
|
||||||
int64_t offset;
|
int64_t offset;
|
||||||
|
@ -71,6 +83,55 @@ const char *tsdbFileSuffix[] = {
|
||||||
".meta" // TSDB_FILE_TYPE_META
|
".meta" // TSDB_FILE_TYPE_META
|
||||||
};
|
};
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Initialize the TSDB file handle
|
||||||
|
*/
|
||||||
|
STsdbFileH *tsdbInitFile(char *dataDir, int32_t daysPerFile, int32_t keep, int32_t minRowsPerFBlock,
|
||||||
|
int32_t maxRowsPerFBlock) {
|
||||||
|
STsdbFileH *pTsdbFileH =
|
||||||
|
(STsdbFileH *)calloc(1, sizeof(STsdbFileH) + sizeof(SFileGroup) * tsdbGetMaxNumOfFiles(keep, daysPerFile));
|
||||||
|
if (pTsdbFileH == NULL) return NULL;
|
||||||
|
|
||||||
|
pTsdbFileH->daysPerFile = daysPerFile;
|
||||||
|
pTsdbFileH->keep = keep;
|
||||||
|
pTsdbFileH->minRowPerFBlock = minRowsPerFBlock;
|
||||||
|
pTsdbFileH->maxRowsPerFBlock = maxRowsPerFBlock;
|
||||||
|
|
||||||
|
// Open the directory to read information of each file
|
||||||
|
DIR *dir = opendir(dataDir);
|
||||||
|
if (dir == NULL) {
|
||||||
|
free(pTsdbFileH);
|
||||||
|
return NULL;
|
||||||
|
}
|
||||||
|
|
||||||
|
struct dirent *dp;
|
||||||
|
char fname[256];
|
||||||
|
while ((dp = readdir(dir)) != NULL) {
|
||||||
|
if (strncmp(dp->d_name, ".", 1) == 0 || strncmp(dp->d_name, "..", 2) == 0) continue;
|
||||||
|
if (true /* check if the file is the .head file */) {
|
||||||
|
int fileId = 0;
|
||||||
|
int vgId = 0;
|
||||||
|
sscanf(dp->d_name, "v%df%d.head", &vgId, &fileId);
|
||||||
|
// TODO
|
||||||
|
|
||||||
|
// Open head file
|
||||||
|
|
||||||
|
// Open data file
|
||||||
|
|
||||||
|
// Open last file
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return pTsdbFileH;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closet the file handle
|
||||||
|
*/
|
||||||
|
void tsdbCloseFile(STsdbFileH *pFileH) {
|
||||||
|
// TODO
|
||||||
|
}
|
||||||
|
|
||||||
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) {
|
char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) {
|
||||||
if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL;
|
if (!IS_VALID_TSDB_FILE_TYPE(type)) return NULL;
|
||||||
|
|
||||||
|
@ -80,3 +141,9 @@ char *tsdbGetFileName(char *dirName, char *fname, TSDB_FILE_TYPE type) {
|
||||||
sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]);
|
sprintf(fileName, "%s/%s%s", dirName, fname, tsdbFileSuffix[type]);
|
||||||
return fileName;
|
return fileName;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static void tsdbGetKeyRangeOfFileId(int32_t daysPerFile, int8_t precision, int32_t fileId, TSKEY *minKey,
|
||||||
|
TSKEY *maxKey) {
|
||||||
|
*minKey = fileId * daysPerFile * tsMsPerDay[precision];
|
||||||
|
*maxKey = *minKey + daysPerFile * tsMsPerDay[precision] - 1;
|
||||||
|
}
|
|
@ -308,7 +308,6 @@ int tsdbAlterTable(tsdb_repo_t *pRepo, STableCfg *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
int tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
|
int tsdbDropTable(tsdb_repo_t *repo, STableId tableId) {
|
||||||
// TODO
|
|
||||||
if (repo == NULL) return -1;
|
if (repo == NULL) return -1;
|
||||||
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
STsdbRepo *pRepo = (STsdbRepo *)repo;
|
||||||
|
|
||||||
|
|
|
@ -18,6 +18,7 @@ static int tsdbAddTableIntoMap(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbAddTableIntoIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable);
|
||||||
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
static int tsdbEstimateTableEncodeSize(STable *pTable);
|
||||||
|
static char * getTupleKey(const void *data);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encode a TSDB table object as a binary content
|
* Encode a TSDB table object as a binary content
|
||||||
|
@ -153,7 +154,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
||||||
super->tagSchema = tdDupSchema(pCfg->tagSchema);
|
super->tagSchema = tdDupSchema(pCfg->tagSchema);
|
||||||
super->tagVal = tdDataRowDup(pCfg->tagValues);
|
super->tagVal = tdDataRowDup(pCfg->tagValues);
|
||||||
super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
|
super->content.pIndex = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, sizeof(int64_t), 1,
|
||||||
0, NULL); // Allow duplicate key, no lock
|
0, getTupleKey); // Allow duplicate key, no lock
|
||||||
|
|
||||||
if (super->content.pIndex == NULL) {
|
if (super->content.pIndex == NULL) {
|
||||||
tdFreeSchema(super->schema);
|
tdFreeSchema(super->schema);
|
||||||
|
@ -183,7 +184,7 @@ int32_t tsdbCreateTableImpl(STsdbMeta *pMeta, STableCfg *pCfg) {
|
||||||
table->superUid = -1;
|
table->superUid = -1;
|
||||||
table->schema = tdDupSchema(pCfg->schema);
|
table->schema = tdDupSchema(pCfg->schema);
|
||||||
}
|
}
|
||||||
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, 0, 8, 0, 0, NULL);
|
table->content.pData = tSkipListCreate(TSDB_SUPER_TABLE_SL_LEVEL, TSDB_DATA_TYPE_TIMESTAMP, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], 0, 0, getTupleKey);
|
||||||
|
|
||||||
if (newSuper) tsdbAddTableToMeta(pMeta, super);
|
if (newSuper) tsdbAddTableToMeta(pMeta, super);
|
||||||
tsdbAddTableToMeta(pMeta, table);
|
tsdbAddTableToMeta(pMeta, table);
|
||||||
|
@ -320,3 +321,9 @@ static int tsdbEstimateTableEncodeSize(STable *pTable) {
|
||||||
// TODO
|
// TODO
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static char *getTupleKey(const void * data) {
|
||||||
|
SDataRow row = (SDataRow)data;
|
||||||
|
|
||||||
|
return dataRowAt(row, TD_DATA_ROW_HEAD_SIZE);
|
||||||
|
}
|
|
@ -83,7 +83,7 @@ int32_t tsdbInsertMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: make below a function to implement
|
// TODO: make below a function to implement
|
||||||
if (fseek(mfh->fd, info.offset, SEEK_CUR) < 0) {
|
if (lseek(mfh->fd, info.offset, SEEK_CUR) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -114,7 +114,7 @@ int32_t tsdbDeleteMetaRecord(SMetaFile *mfh, int64_t uid) {
|
||||||
// Remove record from file
|
// Remove record from file
|
||||||
|
|
||||||
info.offset = -info.offset;
|
info.offset = -info.offset;
|
||||||
if (fseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
|
if (lseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,7 +149,7 @@ int32_t tsdbUpdateMetaRecord(SMetaFile *mfh, int64_t uid, void *cont, int32_t co
|
||||||
|
|
||||||
mfh->size += contLen;
|
mfh->size += contLen;
|
||||||
}
|
}
|
||||||
if (fseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
|
if (lseek(mfh->fd, -info.offset, SEEK_CUR) < 0) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -212,7 +212,7 @@ static int tsdbRestoreFromMetaFile(char *fname, SMetaFile *mfh) {
|
||||||
return -1;
|
return -1;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (fseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET) < 0) {
|
if (lseek(fd, TSDB_META_FILE_HEADER_SIZE, SEEK_SET) < 0) {
|
||||||
// TODO: deal with the error
|
// TODO: deal with the error
|
||||||
close(fd);
|
close(fd);
|
||||||
return -1;
|
return -1;
|
||||||
|
|
|
@ -48,7 +48,7 @@ TEST(TsdbTest, createRepo) {
|
||||||
|
|
||||||
for (int j = 0; j < schemaNCols(schema); j++) {
|
for (int j = 0; j < schemaNCols(schema); j++) {
|
||||||
if (j == 0) { // Just for timestamp
|
if (j == 0) { // Just for timestamp
|
||||||
tdAppendColVal(row, (void *)(&time), schemaColAt(schema, j));
|
tdAppendColVal(row, (void *)(&ttime), schemaColAt(schema, j));
|
||||||
} else { // For int
|
} else { // For int
|
||||||
int val = 10;
|
int val = 10;
|
||||||
tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
|
tdAppendColVal(row, (void *)(&val), schemaColAt(schema, j));
|
||||||
|
@ -61,5 +61,7 @@ TEST(TsdbTest, createRepo) {
|
||||||
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
|
pMsg->length = pMsg->length + sizeof(SSubmitBlk) + pBlock->len;
|
||||||
|
|
||||||
tsdbInsertData(pRepo, pMsg);
|
tsdbInsertData(pRepo, pMsg);
|
||||||
|
|
||||||
|
int k = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue