TD-166
This commit is contained in:
parent
a7a7ef79e6
commit
519b261cc2
|
@ -30,7 +30,7 @@ typedef struct {
|
||||||
int8_t type; // Column type
|
int8_t type; // Column type
|
||||||
int16_t colId; // column ID
|
int16_t colId; // column ID
|
||||||
int32_t bytes; // column bytes
|
int32_t bytes; // column bytes
|
||||||
int32_t offset; // point offset in a row data
|
int32_t offset; // point offset in SDataRow after the header part
|
||||||
} STColumn;
|
} STColumn;
|
||||||
|
|
||||||
#define colType(col) ((col)->type)
|
#define colType(col) ((col)->type)
|
||||||
|
@ -43,26 +43,25 @@ typedef struct {
|
||||||
#define colSetBytes(col, b) (colBytes(col) = (b))
|
#define colSetBytes(col, b) (colBytes(col) = (b))
|
||||||
#define colSetOffset(col, o) (colOffset(col) = (o))
|
#define colSetOffset(col, o) (colOffset(col) = (o))
|
||||||
|
|
||||||
STColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes);
|
|
||||||
void tdFreeCol(STColumn *pCol);
|
|
||||||
void tdColCpy(STColumn *dst, STColumn *src);
|
|
||||||
void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes);
|
|
||||||
|
|
||||||
// ----------------- TSDB SCHEMA DEFINITION
|
// ----------------- TSDB SCHEMA DEFINITION
|
||||||
typedef struct {
|
typedef struct {
|
||||||
|
int totalCols; // Total columns allocated
|
||||||
int numOfCols; // Number of columns appended
|
int numOfCols; // Number of columns appended
|
||||||
int padding; // Total columns allocated
|
int tlen; // maximum length of a SDataRow without the header part
|
||||||
|
int flen; // First part length in a SDataRow after the header part
|
||||||
STColumn columns[];
|
STColumn columns[];
|
||||||
} STSchema;
|
} STSchema;
|
||||||
|
|
||||||
#define schemaNCols(s) ((s)->numOfCols)
|
#define schemaNCols(s) ((s)->numOfCols)
|
||||||
|
#define schemaTotalCols(s) ((s)->totalCols)
|
||||||
|
#define schemaTLen(s) ((s)->tlen)
|
||||||
|
#define schemaFLen(s) ((s)->flen)
|
||||||
#define schemaColAt(s, i) ((s)->columns + i)
|
#define schemaColAt(s, i) ((s)->columns + i)
|
||||||
|
|
||||||
STSchema *tdNewSchema(int32_t nCols);
|
STSchema *tdNewSchema(int32_t nCols);
|
||||||
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
|
#define tdFreeSchema(s) tfree((s))
|
||||||
|
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes);
|
||||||
STSchema *tdDupSchema(STSchema *pSchema);
|
STSchema *tdDupSchema(STSchema *pSchema);
|
||||||
void tdFreeSchema(STSchema *pSchema);
|
|
||||||
void tdUpdateSchema(STSchema *pSchema);
|
|
||||||
int tdGetSchemaEncodeSize(STSchema *pSchema);
|
int tdGetSchemaEncodeSize(STSchema *pSchema);
|
||||||
void * tdEncodeSchema(void *dst, STSchema *pSchema);
|
void * tdEncodeSchema(void *dst, STSchema *pSchema);
|
||||||
STSchema *tdDecodeSchema(void **psrc);
|
STSchema *tdDecodeSchema(void **psrc);
|
||||||
|
@ -70,36 +69,30 @@ STSchema *tdDecodeSchema(void **psrc);
|
||||||
// ----------------- Data row structure
|
// ----------------- Data row structure
|
||||||
|
|
||||||
/* A data row, the format is like below:
|
/* A data row, the format is like below:
|
||||||
* +----------+---------+---------------------------------+---------------------------------+
|
* |<------------------------------------- len ---------------------------------->|
|
||||||
* | int32_t | int32_t | | |
|
* |<--Head ->|<--------- flen -------------->| |
|
||||||
* +----------+---------+---------------------------------+---------------------------------+
|
* +----------+---------------------------------+---------------------------------+
|
||||||
* | len | flen | First part | Second part |
|
* | int32_t | | |
|
||||||
* +----------+---------+---------------------------------+---------------------------------+
|
* +----------+---------------------------------+---------------------------------+
|
||||||
* plen: first part length
|
* | len | First part | Second part |
|
||||||
* len: the length including sizeof(row) + sizeof(len)
|
* +----------+---------------------------------+---------------------------------+
|
||||||
* row: actual row data encoding
|
|
||||||
*/
|
*/
|
||||||
typedef void *SDataRow;
|
typedef void *SDataRow;
|
||||||
|
|
||||||
|
#define TD_DATA_ROW_HEAD_SIZE sizeof(int32_t)
|
||||||
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
|
|
||||||
|
|
||||||
#define dataRowLen(r) (*(int32_t *)(r))
|
#define dataRowLen(r) (*(int32_t *)(r))
|
||||||
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
|
#define dataRowAt(r, idx) ((char *)(r) + (idx))
|
||||||
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
|
#define dataRowTuple(r) dataRowAt(r, TD_DATA_ROW_HEAD_SIZE)
|
||||||
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
|
#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r)))
|
||||||
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
|
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
|
||||||
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
|
|
||||||
#define dataRowIdx(r, i) ((char *)(r) + i)
|
|
||||||
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
|
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
|
||||||
#define dataRowAt(r, idx) ((char *)(r) + (idx))
|
#define dataRowMaxBytesFromSchema(s) ((s)->tlen + TD_DATA_ROW_HEAD_SIZE)
|
||||||
|
|
||||||
void tdInitDataRow(SDataRow row, STSchema *pSchema);
|
|
||||||
int tdMaxRowBytesFromSchema(STSchema *pSchema);
|
|
||||||
SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema);
|
|
||||||
SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
|
SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
|
||||||
void tdFreeDataRow(SDataRow row);
|
void tdFreeDataRow(SDataRow row);
|
||||||
int tdAppendColVal(SDataRow row, void *value, STColumn *pCol);
|
void tdInitDataRow(SDataRow row, STSchema *pSchema);
|
||||||
|
int tdAppendColVal(SDataRow row, void *value, STSchema *pSchema, int col);
|
||||||
void tdDataRowReset(SDataRow row, STSchema *pSchema);
|
void tdDataRowReset(SDataRow row, STSchema *pSchema);
|
||||||
SDataRow tdDataRowDup(SDataRow row);
|
SDataRow tdDataRowDup(SDataRow row);
|
||||||
|
|
||||||
|
|
|
@ -15,71 +15,6 @@
|
||||||
#include "tdataformat.h"
|
#include "tdataformat.h"
|
||||||
#include "tutil.h"
|
#include "tutil.h"
|
||||||
|
|
||||||
static int tdFLenFromSchema(STSchema *pSchema);
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Create a new STColumn object
|
|
||||||
* ASSUMPTIONS: VALID PARAMETERS
|
|
||||||
*
|
|
||||||
* @param type column type
|
|
||||||
* @param colId column ID
|
|
||||||
* @param bytes maximum bytes the col taken
|
|
||||||
*
|
|
||||||
* @return a STColumn object on success
|
|
||||||
* NULL for failure
|
|
||||||
*/
|
|
||||||
STColumn *tdNewCol(int8_t type, int16_t colId, int16_t bytes) {
|
|
||||||
if (!isValidDataType(type, 0)) return NULL;
|
|
||||||
|
|
||||||
STColumn *pCol = (STColumn *)calloc(1, sizeof(STColumn));
|
|
||||||
if (pCol == NULL) return NULL;
|
|
||||||
|
|
||||||
colSetType(pCol, type);
|
|
||||||
colSetColId(pCol, colId);
|
|
||||||
colSetOffset(pCol, -1);
|
|
||||||
switch (type) {
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
colSetBytes(pCol, bytes);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
colSetBytes(pCol, TYPE_BYTES[type]);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
return pCol;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Free a STColumn object CREATED with tdNewCol
|
|
||||||
*/
|
|
||||||
void tdFreeCol(STColumn *pCol) {
|
|
||||||
if (pCol) free(pCol);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Copy from source to destinition
|
|
||||||
*/
|
|
||||||
void tdColCpy(STColumn *dst, STColumn *src) { memcpy((void *)dst, (void *)src, sizeof(STColumn)); }
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Set the column
|
|
||||||
*/
|
|
||||||
void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) {
|
|
||||||
colSetType(pCol, type);
|
|
||||||
colSetColId(pCol, colId);
|
|
||||||
switch (type)
|
|
||||||
{
|
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
|
||||||
colSetBytes(pCol, bytes);
|
|
||||||
break;
|
|
||||||
default:
|
|
||||||
colSetBytes(pCol, TYPE_BYTES[type]);
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a SSchema object with nCols columns
|
* Create a SSchema object with nCols columns
|
||||||
* ASSUMPTIONS: VALID PARAMETERS
|
* ASSUMPTIONS: VALID PARAMETERS
|
||||||
|
@ -90,11 +25,15 @@ void tdSetCol(STColumn *pCol, int8_t type, int16_t colId, int32_t bytes) {
|
||||||
* NULL for failure
|
* NULL for failure
|
||||||
*/
|
*/
|
||||||
STSchema *tdNewSchema(int32_t nCols) {
|
STSchema *tdNewSchema(int32_t nCols) {
|
||||||
int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
|
int32_t size = sizeof(STSchema) + sizeof(STColumn) * nCols;
|
||||||
|
|
||||||
STSchema *pSchema = (STSchema *)calloc(1, size);
|
STSchema *pSchema = (STSchema *)calloc(1, size);
|
||||||
if (pSchema == NULL) return NULL;
|
if (pSchema == NULL) return NULL;
|
||||||
|
|
||||||
pSchema->numOfCols = 0;
|
pSchema->numOfCols = 0;
|
||||||
|
pSchema->totalCols = nCols;
|
||||||
|
pSchema->flen = 0;
|
||||||
|
pSchema->tlen = 0;
|
||||||
|
|
||||||
return pSchema;
|
return pSchema;
|
||||||
}
|
}
|
||||||
|
@ -102,25 +41,33 @@ STSchema *tdNewSchema(int32_t nCols) {
|
||||||
/**
|
/**
|
||||||
* Append a column to the schema
|
* Append a column to the schema
|
||||||
*/
|
*/
|
||||||
int tdSchemaAppendCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
|
int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) {
|
||||||
// if (pSchema->numOfCols >= pSchema->totalCols) return -1;
|
if (!isValidDataType(type, 0) || pSchema->numOfCols >= pSchema->totalCols) return -1;
|
||||||
if (!isValidDataType(type, 0)) return -1;
|
|
||||||
|
|
||||||
STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
|
STColumn *pCol = schemaColAt(pSchema, schemaNCols(pSchema));
|
||||||
colSetType(pCol, type);
|
colSetType(pCol, type);
|
||||||
colSetColId(pCol, colId);
|
colSetColId(pCol, colId);
|
||||||
colSetOffset(pCol, -1);
|
if (pSchema->numOfCols == 0) {
|
||||||
|
colSetOffset(pCol, 0);
|
||||||
|
} else {
|
||||||
|
colSetOffset(pCol, pSchema->columns[pSchema->numOfCols - 1].offset + TYPE_BYTES[type]);
|
||||||
|
}
|
||||||
switch (type) {
|
switch (type) {
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
colSetBytes(pCol, bytes);
|
colSetBytes(pCol, bytes);
|
||||||
|
pSchema->tlen += (TYPE_BYTES[type] + sizeof(int16_t) + bytes); // TODO: remove int16_t here
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
colSetBytes(pCol, TYPE_BYTES[type]);
|
colSetBytes(pCol, TYPE_BYTES[type]);
|
||||||
|
pSchema->tlen += TYPE_BYTES[type];
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
pSchema->numOfCols++;
|
pSchema->numOfCols++;
|
||||||
|
pSchema->flen += TYPE_BYTES[type];
|
||||||
|
|
||||||
|
ASSERT(pCol->offset < pSchema->flen);
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
|
@ -138,40 +85,22 @@ STSchema *tdDupSchema(STSchema *pSchema) {
|
||||||
return tSchema;
|
return tSchema;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Free the SSchema object created by tdNewSchema or tdDupSchema
|
|
||||||
*/
|
|
||||||
void tdFreeSchema(STSchema *pSchema) {
|
|
||||||
if (pSchema != NULL) free(pSchema);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Function to update each columns's offset field in the schema.
|
|
||||||
* ASSUMPTIONS: VALID PARAMETERS
|
|
||||||
*/
|
|
||||||
void tdUpdateSchema(STSchema *pSchema) {
|
|
||||||
STColumn *pCol = NULL;
|
|
||||||
int32_t offset = TD_DATA_ROW_HEAD_SIZE;
|
|
||||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
|
||||||
pCol = schemaColAt(pSchema, i);
|
|
||||||
colSetOffset(pCol, offset);
|
|
||||||
offset += TYPE_BYTES[pCol->type];
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return the size of encoded schema
|
* Return the size of encoded schema
|
||||||
*/
|
*/
|
||||||
int tdGetSchemaEncodeSize(STSchema *pSchema) {
|
int tdGetSchemaEncodeSize(STSchema *pSchema) {
|
||||||
return sizeof(STSchema) + schemaNCols(pSchema) * (T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) +
|
return T_MEMBER_SIZE(STSchema, totalCols) +
|
||||||
T_MEMBER_SIZE(STColumn, bytes));
|
schemaNCols(pSchema) *
|
||||||
|
(T_MEMBER_SIZE(STColumn, type) + T_MEMBER_SIZE(STColumn, colId) + T_MEMBER_SIZE(STColumn, bytes));
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Encode a schema to dst, and return the next pointer
|
* Encode a schema to dst, and return the next pointer
|
||||||
*/
|
*/
|
||||||
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
|
void *tdEncodeSchema(void *dst, STSchema *pSchema) {
|
||||||
T_APPEND_MEMBER(dst, pSchema, STSchema, numOfCols);
|
ASSERT(pSchema->numOfCols == pSchema->totalCols);
|
||||||
|
|
||||||
|
T_APPEND_MEMBER(dst, pSchema, STSchema, totalCols);
|
||||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
||||||
STColumn *pCol = schemaColAt(pSchema, i);
|
STColumn *pCol = schemaColAt(pSchema, i);
|
||||||
T_APPEND_MEMBER(dst, pCol, STColumn, type);
|
T_APPEND_MEMBER(dst, pCol, STColumn, type);
|
||||||
|
@ -186,13 +115,13 @@ void *tdEncodeSchema(void *dst, STSchema *pSchema) {
|
||||||
* Decode a schema from a binary.
|
* Decode a schema from a binary.
|
||||||
*/
|
*/
|
||||||
STSchema *tdDecodeSchema(void **psrc) {
|
STSchema *tdDecodeSchema(void **psrc) {
|
||||||
int numOfCols = 0;
|
int totalCols = 0;
|
||||||
|
|
||||||
T_READ_MEMBER(*psrc, int, numOfCols);
|
T_READ_MEMBER(*psrc, int, totalCols);
|
||||||
|
|
||||||
STSchema *pSchema = tdNewSchema(numOfCols);
|
STSchema *pSchema = tdNewSchema(totalCols);
|
||||||
if (pSchema == NULL) return NULL;
|
if (pSchema == NULL) return NULL;
|
||||||
for (int i = 0; i < numOfCols; i++) {
|
for (int i = 0; i < totalCols; i++) {
|
||||||
int8_t type = 0;
|
int8_t type = 0;
|
||||||
int16_t colId = 0;
|
int16_t colId = 0;
|
||||||
int32_t bytes = 0;
|
int32_t bytes = 0;
|
||||||
|
@ -200,7 +129,7 @@ STSchema *tdDecodeSchema(void **psrc) {
|
||||||
T_READ_MEMBER(*psrc, int16_t, colId);
|
T_READ_MEMBER(*psrc, int16_t, colId);
|
||||||
T_READ_MEMBER(*psrc, int32_t, bytes);
|
T_READ_MEMBER(*psrc, int32_t, bytes);
|
||||||
|
|
||||||
tdSchemaAppendCol(pSchema, type, colId, bytes);
|
tdSchemaAddCol(pSchema, type, colId, bytes);
|
||||||
}
|
}
|
||||||
|
|
||||||
return pSchema;
|
return pSchema;
|
||||||
|
@ -209,53 +138,18 @@ STSchema *tdDecodeSchema(void **psrc) {
|
||||||
/**
|
/**
|
||||||
* Initialize a data row
|
* Initialize a data row
|
||||||
*/
|
*/
|
||||||
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
|
void tdInitDataRow(SDataRow row, STSchema *pSchema) { dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); }
|
||||||
dataRowSetFLen(row, TD_DATA_ROW_HEAD_SIZE);
|
|
||||||
dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + tdFLenFromSchema(pSchema));
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
|
||||||
* Create a data row with maximum row length bytes.
|
int32_t size = dataRowMaxBytesFromSchema(pSchema);
|
||||||
*
|
|
||||||
* NOTE: THE AAPLICATION SHOULD MAKE SURE BYTES IS LARGE ENOUGH TO
|
|
||||||
* HOLD THE WHOE ROW.
|
|
||||||
*
|
|
||||||
* @param bytes max bytes a row can take
|
|
||||||
* @return SDataRow object for success
|
|
||||||
* NULL for failure
|
|
||||||
*/
|
|
||||||
SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema) {
|
|
||||||
int32_t size = sizeof(int32_t) + bytes;
|
|
||||||
|
|
||||||
SDataRow row = malloc(size);
|
SDataRow row = malloc(size);
|
||||||
if (row == NULL) return NULL;
|
if (row == NULL) return NULL;
|
||||||
|
|
||||||
tdInitDataRow(row, pSchema);
|
tdInitDataRow(row, pSchema);
|
||||||
|
|
||||||
return row;
|
return row;
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get maximum bytes a data row from a schema
|
|
||||||
* ASSUMPTIONS: VALID PARAMETER
|
|
||||||
*/
|
|
||||||
int tdMaxRowBytesFromSchema(STSchema *pSchema) {
|
|
||||||
// TODO
|
|
||||||
int bytes = TD_DATA_ROW_HEAD_SIZE;
|
|
||||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
|
||||||
STColumn *pCol = schemaColAt(pSchema, i);
|
|
||||||
bytes += TYPE_BYTES[pCol->type];
|
|
||||||
|
|
||||||
if (pCol->type == TSDB_DATA_TYPE_BINARY || pCol->type == TSDB_DATA_TYPE_NCHAR) {
|
|
||||||
bytes += pCol->bytes;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return bytes;
|
|
||||||
}
|
|
||||||
|
|
||||||
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return tdNewDataRow(tdMaxRowBytesFromSchema(pSchema), pSchema); }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Free the SDataRow object
|
* Free the SDataRow object
|
||||||
*/
|
*/
|
||||||
|
@ -266,20 +160,36 @@ void tdFreeDataRow(SDataRow row) {
|
||||||
/**
|
/**
|
||||||
* Append a column value to the data row
|
* Append a column value to the data row
|
||||||
*/
|
*/
|
||||||
int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) {
|
int tdAppendColVal(SDataRow row, void *value, STSchema *pSchema, int col) {
|
||||||
switch (colType(pCol))
|
ASSERT(schemaNCols(pSchema) > col);
|
||||||
{
|
STColumn *pCol = schemaColAt(pSchema, col);
|
||||||
case TSDB_DATA_TYPE_BINARY:
|
int32_t toffset = pCol->offset + TD_DATA_ROW_HEAD_SIZE;
|
||||||
case TSDB_DATA_TYPE_NCHAR:
|
char * ptr = dataRowAt(row, dataRowLen(row));
|
||||||
*(int32_t *)dataRowAt(row, dataRowFLen(row)) = dataRowLen(row);
|
|
||||||
dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
|
switch (colType(pCol)) {
|
||||||
memcpy((void *)dataRowAt(row, dataRowLen(row)), value, strlen(value));
|
case TSDB_DATA_TYPE_BINARY:
|
||||||
dataRowLen(row) += strlen(value);
|
case TSDB_DATA_TYPE_NCHAR:
|
||||||
break;
|
if (value == NULL) {
|
||||||
default:
|
*(int32_t *)dataRowAt(row, toffset) = -1;
|
||||||
memcpy(dataRowAt(row, dataRowFLen(row)), value, TYPE_BYTES[colType(pCol)]);
|
} else {
|
||||||
dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
|
int16_t slen = (colType(pCol) == TSDB_DATA_TYPE_BINARY) ? strlen((char *)value)
|
||||||
break;
|
: wcslen((wchar_t *)value) * TSDB_NCHAR_SIZE;
|
||||||
|
if (slen > colBytes(pCol)) return -1;
|
||||||
|
|
||||||
|
*(int32_t *)dataRowAt(row, toffset) = dataRowLen(row);
|
||||||
|
*(int16_t *)ptr = slen;
|
||||||
|
ptr += sizeof(int16_t);
|
||||||
|
memcpy((void *)ptr, value, slen);
|
||||||
|
dataRowLen(row) += (sizeof(int16_t) + slen);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
if (value == NULL) {
|
||||||
|
setNull(dataRowAt(row, toffset), colType(pCol), colBytes(pCol));
|
||||||
|
} else {
|
||||||
|
memcpy(dataRowAt(row, toffset), value, TYPE_BYTES[colType(pCol)]);
|
||||||
|
}
|
||||||
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
return 0;
|
return 0;
|
||||||
|
@ -392,19 +302,6 @@ void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop) {
|
||||||
pCols->numOfPoints = pointsLeft;
|
pCols->numOfPoints = pointsLeft;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Return the first part length of a data row for a schema
|
|
||||||
*/
|
|
||||||
static int tdFLenFromSchema(STSchema *pSchema) {
|
|
||||||
int ret = 0;
|
|
||||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
|
||||||
STColumn *pCol = schemaColAt(pSchema, i);
|
|
||||||
ret += TYPE_BYTES[pCol->type];
|
|
||||||
}
|
|
||||||
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge) {
|
||||||
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints);
|
ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfPoints);
|
||||||
|
|
||||||
|
|
|
@ -451,9 +451,8 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
|
||||||
// Update the pMeta->maxCols and pMeta->maxRowBytes
|
// Update the pMeta->maxCols and pMeta->maxRowBytes
|
||||||
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) {
|
if (pTable->type == TSDB_SUPER_TABLE || pTable->type == TSDB_NORMAL_TABLE) {
|
||||||
if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema);
|
if (schemaNCols(pTable->schema) > pMeta->maxCols) pMeta->maxCols = schemaNCols(pTable->schema);
|
||||||
int bytes = tdMaxRowBytesFromSchema(pTable->schema);
|
int bytes = dataRowMaxBytesFromSchema(pTable->schema);
|
||||||
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
|
if (bytes > pMeta->maxRowBytes) pMeta->maxRowBytes = bytes;
|
||||||
tdUpdateSchema(pTable->schema);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return tsdbAddTableIntoMap(pMeta, pTable);
|
return tsdbAddTableIntoMap(pMeta, pTable);
|
||||||
|
|
|
@ -330,7 +330,7 @@ int tsdbWriteDataBlock(SRWHelper *pHelper, SDataCols *pDataCols) {
|
||||||
int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
|
int blkIdx = (pCompBlock == NULL) ? (pIdx->numOfBlocks - 1) : (pCompBlock - pHelper->pCompInfo->blocks);
|
||||||
|
|
||||||
if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block
|
if (pCompBlock == NULL) { // No key overlap, must has last block, just merge with the last block
|
||||||
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfSuperBlocks - 1].last);
|
ASSERT(pIdx->hasLast && pHelper->pCompInfo->blocks[pIdx->numOfBlocks - 1].last);
|
||||||
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
rowsToWrite = tsdbMergeDataWithBlock(pHelper, blkIdx, pDataCols);
|
||||||
if (rowsToWrite < 0) goto _err;
|
if (rowsToWrite < 0) goto _err;
|
||||||
} else { // Has key overlap
|
} else { // Has key overlap
|
||||||
|
@ -782,7 +782,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
||||||
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
TSKEY keyFirst = dataColsKeyFirst(pDataCols);
|
||||||
|
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
ASSERT(blkIdx < pIdx->numOfSuperBlocks);
|
ASSERT(blkIdx < pIdx->numOfBlocks);
|
||||||
|
|
||||||
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
// SCompBlock *pCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
|
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfSubBlocks >= 1);
|
||||||
|
@ -790,7 +790,7 @@ static int tsdbMergeDataWithBlock(SRWHelper *pHelper, int blkIdx, SDataCols *pDa
|
||||||
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
|
// ASSERT(compareKeyBlock((void *)&keyFirst, (void *)pCompBlock) == 0);
|
||||||
|
|
||||||
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
|
if (keyFirst > blockAtIdx(pHelper, blkIdx)->keyLast) { // Merge with the last block by append
|
||||||
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfSuperBlocks-1);
|
ASSERT(blockAtIdx(pHelper, blkIdx)->numOfPoints < pHelper->config.minRowsPerFileBlock && blkIdx == pIdx->numOfBlocks-1);
|
||||||
int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
|
int defaultRowsToWrite = pHelper->config.maxRowsPerFileBlock * 4 / 5; // TODO: make a interface
|
||||||
|
|
||||||
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints);
|
rowsWritten = MIN((defaultRowsToWrite - blockAtIdx(pHelper, blkIdx)->numOfPoints), pDataCols->numOfPoints);
|
||||||
|
@ -961,7 +961,7 @@ static int tsdbAdjustInfoSizeIfNeeded(SRWHelper *pHelper, size_t esize) {
|
||||||
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
|
static int tsdbInsertSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkIdx) {
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
|
|
||||||
ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfSuperBlocks);
|
ASSERT(blkIdx >= 0 && blkIdx <= pIdx->numOfBlocks);
|
||||||
ASSERT(pCompBlock->numOfSubBlocks == 1);
|
ASSERT(pCompBlock->numOfSubBlocks == 1);
|
||||||
|
|
||||||
// Adjust memory if no more room
|
// Adjust memory if no more room
|
||||||
|
@ -1004,7 +1004,7 @@ static int tsdbAddSubBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int blkId
|
||||||
ASSERT(pCompBlock->numOfSubBlocks == 0);
|
ASSERT(pCompBlock->numOfSubBlocks == 0);
|
||||||
|
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks);
|
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
|
||||||
|
|
||||||
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||||
ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
|
ASSERT(pSCompBlock->numOfSubBlocks >= 1 && pSCompBlock->numOfSubBlocks < TSDB_MAX_SUBBLOCKS);
|
||||||
|
@ -1088,7 +1088,7 @@ static int tsdbUpdateSuperBlock(SRWHelper *pHelper, SCompBlock *pCompBlock, int
|
||||||
|
|
||||||
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
SCompIdx *pIdx = pHelper->pCompIdx + pHelper->tableInfo.tid;
|
||||||
|
|
||||||
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfSuperBlocks);
|
ASSERT(blkIdx >= 0 && blkIdx < pIdx->numOfBlocks);
|
||||||
|
|
||||||
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
SCompBlock *pSCompBlock = pHelper->pCompInfo->blocks + blkIdx;
|
||||||
|
|
||||||
|
|
|
@ -105,9 +105,9 @@ TEST(TsdbTest, DISABLED_tableEncodeDecode) {
|
||||||
|
|
||||||
for (int i = 0; i < nCols; i++) {
|
for (int i = 0; i < nCols; i++) {
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
|
tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
|
||||||
} else {
|
} else {
|
||||||
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1);
|
tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -149,9 +149,9 @@ TEST(TsdbTest, createRepo) {
|
||||||
|
|
||||||
for (int i = 0; i < nCols; i++) {
|
for (int i = 0; i < nCols; i++) {
|
||||||
if (i == 0) {
|
if (i == 0) {
|
||||||
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
|
tdSchemaAddCol(schema, TSDB_DATA_TYPE_TIMESTAMP, i, -1);
|
||||||
} else {
|
} else {
|
||||||
tdSchemaAppendCol(schema, TSDB_DATA_TYPE_INT, i, -1);
|
tdSchemaAddCol(schema, TSDB_DATA_TYPE_INT, i, -1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -44,7 +44,7 @@ extern "C" {
|
||||||
|
|
||||||
#define tclose(x) taosCloseSocket(x)
|
#define tclose(x) taosCloseSocket(x)
|
||||||
|
|
||||||
#ifdef ASSERTION
|
#ifndef NDEBUG
|
||||||
#define ASSERT(x) assert(x)
|
#define ASSERT(x) assert(x)
|
||||||
#else
|
#else
|
||||||
#define ASSERT(x)
|
#define ASSERT(x)
|
||||||
|
|
|
@ -123,7 +123,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
|
|
||||||
STSchema *pDestSchema = tdNewSchema(numOfColumns);
|
STSchema *pDestSchema = tdNewSchema(numOfColumns);
|
||||||
for (int i = 0; i < numOfColumns; i++) {
|
for (int i = 0; i < numOfColumns; i++) {
|
||||||
tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
}
|
}
|
||||||
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
||||||
tsdbTableSetName(&tCfg, pTable->tableId, false);
|
tsdbTableSetName(&tCfg, pTable->tableId, false);
|
||||||
|
@ -131,7 +131,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
if (numOfTags != 0) {
|
if (numOfTags != 0) {
|
||||||
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
|
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
|
||||||
for (int i = numOfColumns; i < totalCols; i++) {
|
for (int i = numOfColumns; i < totalCols; i++) {
|
||||||
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
}
|
}
|
||||||
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
|
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
|
||||||
tsdbTableSetSName(&tCfg, pTable->superTableId, false);
|
tsdbTableSetSName(&tCfg, pTable->superTableId, false);
|
||||||
|
@ -141,7 +141,7 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
|
||||||
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||||
|
|
||||||
for (int i = 0; i < numOfTags; i++) {
|
for (int i = 0; i < numOfTags; i++) {
|
||||||
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
|
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema, i);
|
||||||
accumBytes += htons(pSchema[i + numOfColumns].bytes);
|
accumBytes += htons(pSchema[i + numOfColumns].bytes);
|
||||||
}
|
}
|
||||||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||||
|
@ -188,14 +188,14 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
|
|
||||||
STSchema *pDestSchema = tdNewSchema(numOfColumns);
|
STSchema *pDestSchema = tdNewSchema(numOfColumns);
|
||||||
for (int i = 0; i < numOfColumns; i++) {
|
for (int i = 0; i < numOfColumns; i++) {
|
||||||
tdSchemaAppendCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
}
|
}
|
||||||
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
tsdbTableSetSchema(&tCfg, pDestSchema, false);
|
||||||
|
|
||||||
if (numOfTags != 0) {
|
if (numOfTags != 0) {
|
||||||
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
|
STSchema *pDestTagSchema = tdNewSchema(numOfTags);
|
||||||
for (int i = numOfColumns; i < totalCols; i++) {
|
for (int i = numOfColumns; i < totalCols; i++) {
|
||||||
tdSchemaAppendCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes));
|
||||||
}
|
}
|
||||||
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
|
tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false);
|
||||||
|
|
||||||
|
@ -204,7 +204,7 @@ static int32_t vnodeProcessAlterTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
|
||||||
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
SDataRow dataRow = tdNewDataRowFromSchema(pDestTagSchema);
|
||||||
|
|
||||||
for (int i = 0; i < numOfTags; i++) {
|
for (int i = 0; i < numOfTags; i++) {
|
||||||
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema->columns + i);
|
tdAppendColVal(dataRow, pTagData + accumBytes, pDestTagSchema, i);
|
||||||
accumBytes += htons(pSchema[i + numOfColumns].bytes);
|
accumBytes += htons(pSchema[i + numOfColumns].bytes);
|
||||||
}
|
}
|
||||||
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
tsdbTableSetTagValue(&tCfg, dataRow, false);
|
||||||
|
|
Loading…
Reference in New Issue