fix more code

This commit is contained in:
hzcheng 2020-03-06 08:18:53 +00:00
parent d0e32fbdea
commit 6656c118d6
5 changed files with 165 additions and 212 deletions

View File

@ -1,4 +1,4 @@
#if !defined(_TD_SCHEMA_H_) #ifndef _TD_SCHEMA_H_
#define _TD_SCHEMA_H_ #define _TD_SCHEMA_H_
#include <stdint.h> #include <stdint.h>
@ -6,25 +6,50 @@
#include "type.h" #include "type.h"
// Column definition #ifdef __cplusplus
// TODO: if we need to align the structure extern "C" {
#endif
// ---- Column definition and operations
typedef struct { typedef struct {
td_datatype_t type; // Column type int8_t type; // Column type
int32_t colId; // column ID int16_t colId; // column ID
int32_t bytes; // column bytes int16_t bytes; // column bytes
int32_t offset; // point offset in a row data int32_t offset; // point offset in a row data
char * colName; // the column name
} SColumn; } SColumn;
// Schema definition #define colType(col) ((col)->type)
#define colColId(col) ((col)->colId)
#define colBytes(col) ((col)->bytes)
#define colOffset(col) ((col)->offset)
#define colSetType(col, t) (colType(col) = (t))
#define colSetColId(col, id) (colColId(col) = (id))
#define colSetBytes(col, b) (colBytes(col) = (b))
#define colSetOffset(col, o) (colOffset(col) = (o))
SColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes);
void tdFreeCol(SColumn *pCol);
void tdColCpy(SColumn *dst, SColumn *src);
// ---- Schema definition and operations
typedef struct { typedef struct {
int32_t version; // schema version, it is used to change the schema
int32_t numOfCols; int32_t numOfCols;
int32_t numOfTags; int32_t padding; // TODO: replace the padding for useful variable
int32_t colIdCounter;
SColumn columns[]; SColumn columns[];
} SSchema; } SSchema;
#define schemaNCols(s) ((s)->numOfCols)
#define schemaColAt(s, i) ((s)->columns + i)
SSchema *tdNewSchema(int32_t nCols);
SSchema *tdDupSchema(SSchema *pSchema);
void tdFreeSchema(SSchema *pSchema);
void tdUpdateSchema(SSchema *pSchema);
int32_t tdMaxRowDataBytes(SSchema *pSchema);
// ---- Inline schema definition and operations
/* Inline schema definition /* Inline schema definition
* +---------+---------+---------+-----+---------+-----------+-----+-----------+ * +---------+---------+---------+-----+---------+-----------+-----+-----------+
* | int32_t | | | | | | | | * | int32_t | | | | | | | |
@ -34,42 +59,10 @@ typedef struct {
*/ */
typedef char *SISchema; typedef char *SISchema;
// TODO: decide if the space is allowed // TODO: add operations on SISchema
#define TD_ISCHEMA_HEADER_SIZE sizeof(int32_t) + sizeof(SSchema)
// ---- operations on SColumn #ifdef __cplusplus
#define TD_COLUMN_TYPE(pCol) ((pCol)->type) // column type }
#define TD_COLUMN_ID(pCol) ((pCol)->colId) // column ID #endif
#define TD_COLUMN_BYTES(pCol) ((pCol)->bytes) // column bytes
#define TD_COLUMN_OFFSET(pCol) ((pCol)->offset) // column bytes
#define TD_COLUMN_NAME(pCol) ((pCol)->colName) // column name
#define TD_COLUMN_INLINE_SIZE(pCol) (sizeof(SColumn) + TD_COLUMN_NAME(pCol) + 1)
// ---- operations on SSchema
#define TD_SCHEMA_VERSION(pSchema) ((pSchema)->version) // schema version
#define TD_SCHEMA_NCOLS(pSchema) ((pSchema)->numOfCols) // schema number of columns
#define TD_SCHEMA_NTAGS(pSchema) ((pSchema)->numOfTags) // schema number of tags
#define TD_SCHEMA_TOTAL_COLS(pSchema) (TD_SCHEMA_NCOLS(pSchema) + TD_SCHEMA_NTAGS(pSchema)) // schema total number of SColumns (#columns + #tags)
#define TD_SCHEMA_NEXT_COLID(pSchema) ((pSchema)->colIdCounter++)
#define TD_SCHEMA_COLS(pSchema) ((pSchema)->columns)
#define TD_SCHEMA_TAGS(pSchema) (TD_SCHEMA_COLS(pSchema) + TD_SCHEMA_NCOLS(pSchema))
#define TD_SCHEMA_COLUMN_AT(pSchema, idx) (TD_SCHEMA_COLS(pSchema) + idx)
#define TD_SCHEMA_TAG_AT(pSchema, idx) (TD_SCHEMA_TAGS(pSchema) + idx)
// ---- operations on SISchema
#define TD_ISCHEMA_LEN(pISchema) *((int32_t *)(pISchema))
#define TD_ISCHEMA_SCHEMA(pISchema) ((SSchema *)((pISchema) + sizeof(int32_t)))
#define TD_ISCHEMA_COL_NAMES(pISchema) ((pISchema) + TD_ISCHEMA_HEADER_SIZE + sizeof(SColumn) * TD_SCHEMA_TOTAL_COLS(TD_ISCHEMA_SCHEMA(pISchema)))
// ----
/* Convert a schema structure to an inline schema structure
*/
SISchema tdConvertSchemaToInline(SSchema *pSchema);
int32_t tdGetColumnIdxByName(SSchema *pSchema, char *colName);
int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId);
SSchema *tdDupSchema(SSchema *pSchema);
void tdFreeSchema(SSchema *pSchema);
// ---- TODO: operations to modify schema
#endif // _TD_SCHEMA_H_ #endif // _TD_SCHEMA_H_

View File

@ -2,8 +2,6 @@
#include "dataformat.h" #include "dataformat.h"
static int32_t tdGetMaxDataRowSize(SSchema *pSchema);
/** /**
* Create a data row with maximum row length bytes. * Create a data row with maximum row length bytes.
* *
@ -26,7 +24,7 @@ SDataRow tdNewDataRow(int32_t bytes) {
} }
SDataRow tdNewDdataFromSchema(SSchema *pSchema) { SDataRow tdNewDdataFromSchema(SSchema *pSchema) {
int32_t bytes = tdGetMaxDataRowSize(pSchema); int32_t bytes = tdMaxRowDataBytes(pSchema);
return tdNewDataRow(bytes); return tdNewDataRow(bytes);
} }
@ -77,35 +75,6 @@ int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixO
return 0; return 0;
} }
static int32_t tdGetMaxDataRowSize(SSchema *pSchema) {
int32_t nbytes = 0;
for (int32_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) {
SColumn * pCol = TD_SCHEMA_COLUMN_AT(pSchema, i);
td_datatype_t type = TD_COLUMN_TYPE(pCol);
nbytes += rowDataLen[type];
switch (type) {
case TD_DATATYPE_VARCHAR:
nbytes += TD_COLUMN_BYTES(pCol);
break;
case TD_DATATYPE_NCHAR:
nbytes += 4 * TD_COLUMN_BYTES(pCol);
break;
case TD_DATATYPE_BINARY:
nbytes += TD_COLUMN_BYTES(pCol);
break;
default:
break;
}
}
nbytes += TD_DATA_ROW_HEADER_SIZE;
return nbytes;
}
SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; } SDataRow tdSDataRowDup(SDataRow rdata) { return NULL; }
void tdFreeSDataRow(SDataRow rdata) { void tdFreeSDataRow(SDataRow rdata) {
if (rdata == NULL) return; if (rdata == NULL) return;

View File

@ -1,6 +1,7 @@
#include <stdlib.h> #include <stdlib.h>
#include "schema.h" #include "schema.h"
const int32_t rowDataLen[] = { const int32_t rowDataLen[] = {
sizeof(int8_t), // TD_DATATYPE_BOOL, sizeof(int8_t), // TD_DATATYPE_BOOL,
sizeof(int8_t), // TD_DATATYPE_TINYINT, sizeof(int8_t), // TD_DATATYPE_TINYINT,
@ -13,93 +14,122 @@ const int32_t rowDataLen[] = {
sizeof(int32_t), // TD_DATATYPE_NCHAR, sizeof(int32_t), // TD_DATATYPE_NCHAR,
sizeof(int32_t) // TD_DATATYPE_BINARY sizeof(int32_t) // TD_DATATYPE_BINARY
}; };
/**
* Create a new SColumn object
* ASSUMPTIONS: VALID PARAMETERS
*
* @param type column type
* @param colId column ID
* @param bytes maximum bytes the col taken
*
* @return a SColumn object on success
* NULL for failure
*/
SColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes) {
SColumn *pCol = (SColumn *)calloc(1, sizeof(SColumn));
if (pCol == NULL) return NULL;
colSetType(pCol, type);
colSetColId(pCol, colId);
switch (type) {
case TD_DATATYPE_VARCHAR:
case TD_DATATYPE_NCHAR:
case TD_DATATYPE_BINARY:
colSetBytes(pCol, bytes);
break;
default:
colSetBytes(pCol, rowDataLen[type]);
break;
}
return pCol;
}
/**
* Free a SColumn object CREATED with tdNewCol
*/
void tdFreeCol(SColumn *pCol) {
if (pCol) free(pCol);
}
void tdColCpy(SColumn *dst, SColumn *src) { memcpy((void *)dst, (void *)src, sizeof(SColumn)); }
/**
* Create a SSchema object with nCols columns
* ASSUMPTIONS: VALID PARAMETERS
*
* @param nCols number of columns the schema has
*
* @return a SSchema object for success
* NULL for failure
*/
SSchema *tdNewSchema(int32_t nCols) {
int32_t size = sizeof(SSchema) + sizeof(SColumn) * nCols;
SSchema *pSchema = (SSchema *)calloc(1, size);
if (pSchema == NULL) return NULL;
pSchema->numOfCols = nCols;
return pSchema;
}
/**
* Free the SSchema object created by tdNewSchema or tdDupSchema
*/
void tdFreeSchema(SSchema *pSchema) { void tdFreeSchema(SSchema *pSchema) {
// TODO if (pSchema == NULL) free(pSchema);
return;
}
static size_t tdGetEstimatedISchemaLen(SSchema *pSchema) {
size_t colNameLen = 0;
for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) {
colNameLen += (strlen(TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i))) + 1);
}
for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++) {
colNameLen += (strlen(TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i))) + 1);
}
return TD_ISCHEMA_HEADER_SIZE + (size_t)TD_SCHEMA_TOTAL_COLS(pSchema) + colNameLen;
}
static void tdUpdateColumnOffsets(SSchema *pSchema) {
int32_t offset = 0;
for (size_t i = 0; i < TD_SCHEMA_NCOLS(pSchema); i++)
{
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i);
TD_COLUMN_OFFSET(pCol) = offset;
offset += rowDataLen[TD_COLUMN_TYPE(pCol)];
}
offset = 0;
for (size_t i = 0; i < TD_SCHEMA_NTAGS(pSchema); i++) {
SColumn *pCol = TD_SCHEMA_TAG_AT(pSchema, i);
TD_COLUMN_OFFSET(pCol) = offset;
offset += rowDataLen[TD_COLUMN_TYPE(pCol)];
}
}
SISchema tdConvertSchemaToInline(SSchema *pSchema) {
size_t len = tdGetEstimatedISchemaLen(pSchema);
int32_t totalCols = TD_SCHEMA_TOTAL_COLS(pSchema);
// TODO: if use pISchema is reasonable?
SISchema pISchema = malloc(len);
if (pSchema == NULL) {
// TODO: add error handling
return NULL;
}
TD_ISCHEMA_LEN(pISchema) = (int32_t)len;
memcpy((void *)TD_ISCHEMA_SCHEMA(pISchema), (void *)pSchema, sizeof(SSchema));
// TD_SCHEMA_COLS(TD_ISCHEMA_SCHEMA(pISchema)) = (SColumn *)(pISchema + TD_ISCHEMA_HEADER_SIZE);
memcpy((void *)TD_SCHEMA_COLS(TD_ISCHEMA_SCHEMA(pISchema)), (void *)TD_SCHEMA_COLS(pSchema),
sizeof(SColumn) * totalCols);
char *pName = TD_ISCHEMA_COL_NAMES(pISchema);
for (int32_t i = 0; i < totalCols; i++) {
SColumn *pCol = TD_SCHEMA_COLUMN_AT(TD_ISCHEMA_SCHEMA(pISchema), i);
char * colName = TD_COLUMN_NAME(TD_SCHEMA_COLUMN_AT(pSchema, i));
TD_COLUMN_NAME(pCol) = pName;
size_t tlen = strlen(colName) + 1;
memcpy((void *)pName, (void *)colName, tlen);
pName += tlen;
}
return pISchema;
}
int32_t tdGetColumnIdxByName(SSchema *pSchema, char *colName) {
for (int32_t i = 0; i < TD_SCHEMA_TOTAL_COLS(pSchema); i++) {
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i);
if (strcmp(colName, TD_COLUMN_NAME(pCol)) == 0) {
return i;
}
}
return -1;
}
int32_t tdGetColumnIdxById(SSchema *pSchema, int32_t colId) {
for (int32_t i = 0; i < TD_SCHEMA_TOTAL_COLS(pSchema); i++) {
SColumn *pCol = TD_SCHEMA_COLUMN_AT(pSchema, i);
if (TD_COLUMN_ID(pCol) == colId) {
return i;
}
}
return -1;
} }
SSchema *tdDupSchema(SSchema *pSchema) { SSchema *tdDupSchema(SSchema *pSchema) {
return NULL; SSchema *tSchema = tdNewSchema(schemaNCols(pSchema));
if (tSchema == NULL) return NULL;
int32_t size = sizeof(SSchema) + sizeof(SColumn) * schemaNCols(pSchema);
memcpy((void *)tSchema, (void *)pSchema, size);
return tSchema;
}
/**
* Function to update each columns's offset field in the schema.
* ASSUMPTIONS: VALID PARAMETERS
*/
void tdUpdateSchema(SSchema *pSchema) {
SColumn *pCol = NULL;
int32_t offset = 0;
for (int i = 0; i < schemaNCols(pSchema); i++) {
pCol = schemaColAt(pSchema, i);
colSetOffset(pCol, offset);
offset += rowDataLen[pCol->type];
}
}
/**
* Get the maximum size of a row data with the schema
*/
int32_t tdMaxRowDataBytes(SSchema *pSchema) {
int32_t size = 0;
SColumn *pCol = NULL;
for (int i = 0; i < schemaNCols(pSchema); i++) {
pCol = schemaColAt(pSchema, i);
size += rowDataLen[pCol->type];
switch (pCol->type) {
case TD_DATATYPE_VARCHAR:
size += (pCol->bytes + 1); // TODO: remove literal here
break;
case TD_DATATYPE_NCHAR:
size += (pCol->bytes + 4); // TODO: check and remove literal here
break;
case TD_DATATYPE_BINARY:
size += pCol->bytes;
break;
default:
break;
}
}
return size;
} }

View File

@ -1,7 +1,7 @@
aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) aux_source_directory(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST)
add_executable(tsdbTests ${SOURCE_LIST}) add_executable(tsdbTests ${SOURCE_LIST})
target_link_libraries(tsdbTests gtest gtest_main pthread tsdb) target_link_libraries(tsdbTests gtest gtest_main pthread common tsdb)
add_test( add_test(
NAME NAME

View File

@ -10,56 +10,17 @@ TEST(TsdbTest, createTable) {
STableCfg config; STableCfg config;
config.tableId.tid = 0; config.tableId.tid = 0;
config.tableId.uid = 98868728187539L; config.tableId.uid = 98868728187539L;
config.numOfCols = 2; config.numOfCols = 5;
config.schema = (SSchema *)malloc(sizeof(SSchema) + sizeof(SColumn) * config.numOfCols); config.schema = tdNewSchema(config.numOfCols);
config.schema->version = 0; for (int i = 0; i < schemaNCols(config.schema); i++) {
config.schema->numOfCols = 2; SColumn *pCol = tdNewCol(TD_DATATYPE_BIGINT, i, 0);
config.schema->numOfTags = 0; tdColCpy(schemaColAt(config.schema, i), pCol);
config.schema->colIdCounter = 1; tdFreeCol(pCol);
for (int i = 0; i < config.numOfCols; i++) {
SColumn *pCol = config.schema->columns + i;
pCol->type = TD_DATATYPE_BIGINT;
pCol->colId = config.schema->colIdCounter++;
pCol->offset = 10;
pCol->colName = strdup("col1");
} }
config.tagValues = nullptr; config.tagValues = nullptr;
tsdbCreateTableImpl(pMeta, &config), 0; tsdbCreateTableImpl(pMeta, &config);
STable *pTable = tsdbGetTableByUid(pMeta, config.tableId.uid); STable *pTable = tsdbGetTableByUid(pMeta, config.tableId.uid);
ASSERT_NE(pTable, nullptr); ASSERT_NE(pTable, nullptr);
}
TEST(TsdbTest, DISABLED_createTsdbRepo) {
STsdbCfg *pCfg = tsdbCreateDefaultCfg();
tsdb_repo_t *pRepo = tsdbCreateRepo("/root/mnt/test/vnode0", pCfg, NULL);
tsdbFreeCfg(pCfg);
ASSERT_NE(pRepo, nullptr);
STableCfg config;
config.tableId.tid = 0;
config.tableId.uid = 10889498868728187539;
config.numOfCols = 2;
config.schema = (SSchema *)malloc(sizeof(SSchema) + sizeof(SColumn) * config.numOfCols);
config.schema->version = 0;
config.schema->numOfCols = 2;
config.schema->numOfTags = 0;
config.schema->colIdCounter = 1;
for (int i = 0; i < config.numOfCols; i++) {
SColumn *pCol = config.schema->columns + i;
pCol->type = TD_DATATYPE_BIGINT;
pCol->colId = config.schema->colIdCounter++;
pCol->offset = 10;
pCol->colName = strdup("col1");
}
config.tagValues = NULL;
tsdbCreateTable(pRepo, &config);
tsdbCloseRepo(pRepo);
} }