TD-27
This commit is contained in:
parent
04e0fa8047
commit
31705da49b
|
@ -51,19 +51,21 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes);
|
|||
// ----------------- TSDB SCHEMA DEFINITION
|
||||
typedef struct {
|
||||
int numOfCols; // Number of columns appended
|
||||
int totalCols; // Total columns allocated
|
||||
int padding; // Total columns allocated
|
||||
STColumn columns[];
|
||||
} STSchema;
|
||||
|
||||
#define schemaNCols(s) ((s)->numOfCols)
|
||||
#define schemaTCols(s) ((s)->totalCols)
|
||||
#define schemaColAt(s, i) ((s)->columns + i)
|
||||
|
||||
STSchema *tdNewSchema(int32_t nCols);
|
||||
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes);
|
||||
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
|
||||
STSchema *tdDupSchema(STSchema *pSchema);
|
||||
void tdFreeSchema(STSchema *pSchema);
|
||||
void tdUpdateSchema(STSchema *pSchema);
|
||||
int tdGetSchemaEncodeSize(STSchema *pSchema);
|
||||
void * tdEncodeSchema(void *dst, STSchema *pSchema);
|
||||
STSchema *tdDecodeSchema(void **psrc);
|
||||
|
||||
// ----------------- Data row structure
|
||||
|
||||
|
@ -99,33 +101,6 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol);
|
|||
void tdDataRowReset(SDataRow row, STSchema *pSchema);
|
||||
SDataRow tdDataRowDup(SDataRow row);
|
||||
|
||||
/* Data rows definition, the format of it is like below:
|
||||
* +---------+-----------------------+--------+-----------------------+
|
||||
* | int32_t | | | |
|
||||
* +---------+-----------------------+--------+-----------------------+
|
||||
* | len | SDataRow | .... | SDataRow |
|
||||
* +---------+-----------------------+--------+-----------------------+
|
||||
*/
|
||||
typedef void *SDataRows;
|
||||
|
||||
#define TD_DATA_ROWS_HEAD_LEN sizeof(int32_t)
|
||||
|
||||
#define dataRowsLen(rs) (*(int32_t *)(rs))
|
||||
#define dataRowsSetLen(rs, l) (dataRowsLen(rs) = (l))
|
||||
#define dataRowsInit(rs) dataRowsSetLen(rs, sizeof(int32_t))
|
||||
|
||||
void tdDataRowsAppendRow(SDataRows rows, SDataRow row);
|
||||
|
||||
// Data rows iterator
|
||||
typedef struct {
|
||||
int32_t totalLen;
|
||||
int32_t len;
|
||||
SDataRow row;
|
||||
} SDataRowsIter;
|
||||
|
||||
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter);
|
||||
SDataRow tdDataRowsNext(SDataRowsIter *pIter);
|
||||
|
||||
/* Data column definition
|
||||
* +---------+---------+-----------------------+
|
||||
* | int32_t | int32_t | |
|
||||
|
|
|
@ -94,7 +94,6 @@ STSchema *tdNewSchema(int32_t nCols) {
|
|||
STSchema *pSchema = (STSchema *)malloc(size);
|
||||
if (pSchema == NULL) return NULL;
|
||||
pSchema->numOfCols = 0;
|
||||
pSchema->totalCols = nCols;
|
||||
|
||||
return pSchema;
|
||||
}
|
||||
|
@ -102,8 +101,8 @@ STSchema *tdNewSchema(int32_t nCols) {
|
|||
/**
|
||||
* Append a column to the schema
|
||||
*/
|
||||
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int16_t bytes) {
|
||||
if (pSchema->numOfCols >= pSchema->totalCols) return -1;
|
||||
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
|
||||
// if (pSchema->numOfCols >= pSchema->totalCols) return -1;
|
||||
if (!isValidDataType(type, 0)) return -1;
|
||||
|
||||
STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
|
||||
|
@ -159,6 +158,53 @@ void tdUpdateSchema(STSchema *pSchema) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the size of encoded schema
|
||||
*/
|
||||
int tdGetSchemaEncodeSize(STSchema *pSchema) {
|
||||
return sizeof(STSchema) + schemaNCols(pSchema) * (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) +
|
||||
T_MEMBER_SIZE(STColumn, bytes));
|
||||
}
|
||||
|
||||
/**
|
||||
* Encode a schema to dst, and return the next pointer
|
||||
*/
|
||||
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
|
||||
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
|
||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, i);
|
||||
T_APPEND_MEMBER(dst, pCol, STColumn, type);
|
||||
T_APPEND_MEMBER(dst, pCol, STColumn, colId);
|
||||
T_APPEND_MEMBER(dst, pCol, STColumn, bytes);
|
||||
}
|
||||
|
||||
return dst;
|
||||
}
|
||||
|
||||
/**
|
||||
* Decode a schema from a binary.
|
||||
*/
|
||||
STSchema *tdDecodeSchema(void **psrc) {
|
||||
int numOfCols = 0;
|
||||
|
||||
T_READ_MEMBER(*psrc, int, numOfCols);
|
||||
|
||||
STSchema *pSchema = tdNewSchema(numOfCols);
|
||||
if (pSchema == NULL) return NULL;
|
||||
for (int i = 0; i < numOfCols; i++) {
|
||||
int8_t type = 0;
|
||||
int16_t colId = 0;
|
||||
int32_t bytes = 0;
|
||||
T_READ_MEMBER(*psrc, int8_t, type);
|
||||
T_READ_MEMBER(*psrc, int16_t, colId);
|
||||
T_READ_MEMBER(*psrc, int32_t, bytes);
|
||||
|
||||
tdSchemaAppendCol(pSchema, type, colId, bytes);
|
||||
}
|
||||
|
||||
return pSchema;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a data row
|
||||
*/
|
||||
|
@ -234,6 +280,8 @@ int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) {
|
|||
dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
|
||||
break;
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); }
|
||||
|
@ -246,40 +294,6 @@ SDataRow tdDataRowDup(SDataRow row) {
|
|||
return trow;
|
||||
}
|
||||
|
||||
void tdDataRowsAppendRow(SDataRows rows, SDataRow row) {
|
||||
dataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
|
||||
dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row));
|
||||
}
|
||||
|
||||
// Initialize the iterator
|
||||
void tdInitSDataRowsIter(SDataRows rows, SDataRowsIter *pIter) {
|
||||
if (pIter == NULL) return;
|
||||
pIter->totalLen = dataRowsLen(rows);
|
||||
|
||||
if (pIter->totalLen == TD_DATA_ROWS_HEAD_LEN) {
|
||||
pIter->row = NULL;
|
||||
return;
|
||||
}
|
||||
|
||||
pIter->row = (SDataRow)((char *)rows + TD_DATA_ROWS_HEAD_LEN);
|
||||
pIter->len = TD_DATA_ROWS_HEAD_LEN + dataRowLen(pIter->row);
|
||||
}
|
||||
|
||||
// Get the next row in Rows
|
||||
SDataRow tdDataRowsNext(SDataRowsIter *pIter) {
|
||||
SDataRow row = pIter->row;
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->row = NULL;
|
||||
} else {
|
||||
pIter->row = (char *)row + dataRowLen(row);
|
||||
pIter->len += dataRowLen(row);
|
||||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the first part length of a data row for a schema
|
||||
*/
|
||||
|
|
|
@ -82,6 +82,17 @@ extern const int32_t TYPE_BYTES[11];
|
|||
#define TSDB_TIME_PRECISION_MILLI_STR "ms"
|
||||
#define TSDB_TIME_PRECISION_MICRO_STR "us"
|
||||
|
||||
#define T_MEMBER_SIZE(type, member) sizeof(((type *)0)->member)
|
||||
#define T_APPEND_MEMBER(dst, ptr, type, member) \
|
||||
do {\
|
||||
memcpy((void *)(dst), (void *)(&((ptr)->member)), T_MEMBER_SIZE(type, member));\
|
||||
dst = (void *)((char *)(dst) + T_MEMBER_SIZE(type, member));\
|
||||
} while(0)
|
||||
#define T_READ_MEMBER(src, type, target) \
|
||||
do { \
|
||||
(target) = *(type *)(src); \
|
||||
(src) = (void *)((char *)src + sizeof(type));\
|
||||
} while(0)
|
||||
|
||||
#define TSDB_KEYSIZE sizeof(TSKEY)
|
||||
|
||||
|
|
|
@ -35,7 +35,7 @@ extern "C" {
|
|||
|
||||
// ---------- TSDB TABLE DEFINITION
|
||||
typedef struct STable {
|
||||
TSDB_TABLE_TYPE type;
|
||||
int8_t type;
|
||||
STableId tableId;
|
||||
int32_t superUid; // Super table UID
|
||||
int32_t sversion;
|
||||
|
|
|
@ -459,7 +459,7 @@ SSubmitBlk *tsdbGetSubmitMsgNext(SSubmitMsgIter *pIter) {
|
|||
if (pIter->len >= pIter->totalLen) {
|
||||
pIter->pBlock = NULL;
|
||||
} else {
|
||||
pIter->pBlock = (char *)pBlock + pBlock->len + sizeof(SSubmitBlk);
|
||||
pIter->pBlock = (SSubmitBlk *)((char *)pBlock + pBlock->len + sizeof(SSubmitBlk));
|
||||
}
|
||||
|
||||
return pBlock;
|
||||
|
|
|
@ -39,8 +39,21 @@ void *tsdbEncodeTable(STable *pTable, int *contLen) {
|
|||
void *ret = malloc(*contLen);
|
||||
if (ret == NULL) return NULL;
|
||||
|
||||
// TODO: encode the object to the memory
|
||||
{}
|
||||
void *ptr = ret;
|
||||
T_APPEND_MEMBER(ptr, pTable, STable, type);
|
||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, uid);
|
||||
T_APPEND_MEMBER(ptr, &(pTable->tableId), STableId, tid);
|
||||
T_APPEND_MEMBER(ptr, pTable, STable, superUid);
|
||||
T_APPEND_MEMBER(ptr, pTable, STable, sversion);
|
||||
|
||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||
ptr = tdEncodeSchema(ptr, pTable->schema);
|
||||
ptr = tdEncodeSchema(ptr, pTable->tagSchema);
|
||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
dataRowCpy(ptr, pTable->tagVal);
|
||||
} else {
|
||||
ptr = tdEncodeSchema(ptr, pTable->schema);
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
@ -59,8 +72,20 @@ STable *tsdbDecodeTable(void *cont, int contLen) {
|
|||
STable *pTable = (STable *)calloc(1, sizeof(STable));
|
||||
if (pTable == NULL) return NULL;
|
||||
|
||||
{
|
||||
// TODO recover from the binary content
|
||||
void *ptr = cont;
|
||||
T_READ_MEMBER(ptr, int8_t, pTable->type);
|
||||
T_READ_MEMBER(ptr, int64_t, pTable->tableId.uid);
|
||||
T_READ_MEMBER(ptr, int32_t, pTable->tableId.tid);
|
||||
T_READ_MEMBER(ptr, int32_t, pTable->superUid);
|
||||
T_READ_MEMBER(ptr, int32_t, pTable->sversion);
|
||||
|
||||
if (pTable->type = TSDB_SUPER_TABLE) {
|
||||
pTable->schema = tdDecodeSchema(&ptr);
|
||||
pTable->tagSchema = tdDecodeSchema(&ptr);
|
||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
pTable->tagVal = tdDataRowDup(ptr);
|
||||
} else {
|
||||
pTable->schema = tdDecodeSchema(&ptr);
|
||||
}
|
||||
|
||||
return pTable;
|
||||
|
@ -318,8 +343,22 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) {
|
|||
}
|
||||
|
||||
static int tsdbEstimateTableEncodeSize(STable *pTable) {
|
||||
// TODO
|
||||
return 0;
|
||||
int size = 0;
|
||||
size += T_MEMBER_SIZE(STable, type);
|
||||
size += T_MEMBER_SIZE(STable, tableId);
|
||||
size += T_MEMBER_SIZE(STable, superUid);
|
||||
size += T_MEMBER_SIZE(STable, sversion);
|
||||
|
||||
if (pTable->type == TSDB_SUPER_TABLE) {
|
||||
size += tdGetSchemaEncodeSize(pTable->schema);
|
||||
size += tdGetSchemaEncodeSize(pTable->tagSchema);
|
||||
} else if (pTable->type == TSDB_CHILD_TABLE) {
|
||||
size += dataRowLen(pTable->tagVal);
|
||||
} else {
|
||||
size += tdGetSchemaEncodeSize(pTable->schema);
|
||||
}
|
||||
|
||||
return size;
|
||||
}
|
||||
|
||||
static char *getTupleKey(const void * data) {
|
||||
|
|
|
@ -19,6 +19,8 @@
|
|||
#include "hash.h"
|
||||
#include "tsdbMetaFile.h"
|
||||
|
||||
#define TSDB_META_FILE_VERSION_MAJOR 1
|
||||
#define TSDB_META_FILE_VERSION_MINOR 0
|
||||
#define TSDB_META_FILE_HEADER_SIZE 512
|
||||
|
||||
typedef struct {
|
||||
|
@ -182,6 +184,10 @@ static int32_t tsdbCheckMetaHeader(int fd) {
|
|||
|
||||
static int32_t tsdbWriteMetaHeader(int fd) {
|
||||
// TODO: write the meta file header to file
|
||||
char head[TSDB_META_FILE_HEADER_SIZE] = "\0";
|
||||
sprintf(head, "version: %d.%d", TSDB_META_FILE_VERSION_MAJOR, TSDB_META_FILE_VERSION_MINOR);
|
||||
|
||||
write(fd, (void *)head, TSDB_META_FILE_HEADER_SIZE);
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue