refactor and add more code
This commit is contained in:
parent
c8ed7d1569
commit
e80611c18a
|
@ -69,31 +69,35 @@ void tdUpdateSchema(STSchema *pSchema);
|
|||
// ----------------- Data row structure
|
||||
|
||||
/* A data row, the format is like below:
|
||||
* +---------+---------------------------------+
|
||||
* | int32_t | |
|
||||
* +---------+---------------------------------+
|
||||
* | len | row |
|
||||
* +---------+---------------------------------+
|
||||
* +----------+---------+---------------------------------+---------------------------------+
|
||||
* | int32_t | int32_t | | |
|
||||
* +----------+---------+---------------------------------+---------------------------------+
|
||||
* | len | flen | First part | Second part |
|
||||
* +----------+---------+---------------------------------+---------------------------------+
|
||||
* plen: first part length
|
||||
* len: the length including sizeof(row) + sizeof(len)
|
||||
* row: actual row data encoding
|
||||
*/
|
||||
typedef void *SDataRow;
|
||||
|
||||
#define TD_DATA_ROW_HEAD_SIZE sizeof(int32_t)
|
||||
#define TD_DATA_ROW_HEAD_SIZE (2 * sizeof(int32_t))
|
||||
|
||||
#define dataRowLen(r) (*(int32_t *)(r))
|
||||
#define dataRowFLen(r) (*(int32_t *)((char *)(r) + sizeof(int32_t)))
|
||||
#define dataRowTuple(r) ((char *)(r) + TD_DATA_ROW_HEAD_SIZE)
|
||||
#define dataRowSetLen(r, l) (dataRowLen(r) = (l))
|
||||
#define dataRowSetFLen(r, l) (dataRowFLen(r) = (l))
|
||||
#define dataRowIdx(r, i) ((char *)(r) + i)
|
||||
#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r))
|
||||
#define dataRowAt(r, idx) ((char *)(r) + (idx))
|
||||
|
||||
SDataRow tdNewDataRow(int32_t bytes);
|
||||
void tdInitDataRow(SDataRow row, STSchema *pSchema);
|
||||
int tdMaxRowBytesFromSchema(STSchema *pSchema);
|
||||
SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema);
|
||||
SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
|
||||
void tdFreeDataRow(SDataRow row);
|
||||
int tdAppendColVal(SDataRow row, void *value, STColumn *pCol, int32_t suffixOffset);
|
||||
void tdDataRowCpy(void *dst, SDataRow row);
|
||||
void tdDataRowReset(SDataRow row);
|
||||
int tdAppendColVal(SDataRow row, void *value, STColumn *pCol);
|
||||
void tdDataRowReset(SDataRow row, STSchema *pSchema);
|
||||
SDataRow tdDataRowDup(SDataRow row);
|
||||
|
||||
/* Data rows definition, the format of it is like below:
|
||||
|
|
|
@ -14,6 +14,8 @@
|
|||
*/
|
||||
#include "dataformat.h"
|
||||
|
||||
static int tdFLenFromSchema(STSchema *pSchema);
|
||||
|
||||
/**
|
||||
* Create a new STColumn object
|
||||
* ASSUMPTIONS: VALID PARAMETERS
|
||||
|
@ -157,6 +159,14 @@ void tdUpdateSchema(STSchema *pSchema) {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize a data row
|
||||
*/
|
||||
void tdInitDataRow(SDataRow row, STSchema *pSchema) {
|
||||
dataRowSetFLen(row, TD_DATA_ROW_HEAD_SIZE);
|
||||
dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + tdFLenFromSchema(pSchema));
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a data row with maximum row length bytes.
|
||||
*
|
||||
|
@ -167,13 +177,13 @@ void tdUpdateSchema(STSchema *pSchema) {
|
|||
* @return SDataRow object for success
|
||||
* NULL for failure
|
||||
*/
|
||||
SDataRow tdNewDataRow(int32_t bytes) {
|
||||
SDataRow tdNewDataRow(int32_t bytes, STSchema *pSchema) {
|
||||
int32_t size = sizeof(int32_t) + bytes;
|
||||
|
||||
SDataRow row = malloc(size);
|
||||
if (row == NULL) return NULL;
|
||||
|
||||
dataRowSetLen(row, sizeof(int32_t));
|
||||
tdInitDataRow(row, pSchema);
|
||||
|
||||
return row;
|
||||
}
|
||||
|
@ -197,14 +207,7 @@ int tdMaxRowBytesFromSchema(STSchema *pSchema) {
|
|||
return bytes;
|
||||
}
|
||||
|
||||
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) {
|
||||
int bytes = 0;
|
||||
{
|
||||
// TODO: estimiate size from schema
|
||||
}
|
||||
|
||||
return tdNewDataRow(bytes);
|
||||
}
|
||||
SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return tdNewDataRow(tdMaxRowBytesFromSchema(pSchema), pSchema); }
|
||||
|
||||
/**
|
||||
* Free the SDataRow object
|
||||
|
@ -214,62 +217,37 @@ void tdFreeDataRow(SDataRow row) {
|
|||
}
|
||||
|
||||
/**
|
||||
* Append a column value to a SDataRow object.
|
||||
* NOTE: THE APPLICATION SHOULD MAKE SURE VALID PARAMETERS. THE FUNCTION ASSUMES
|
||||
* THE ROW OBJECT HAS ENOUGH SPACE TO HOLD THE VALUE.
|
||||
*
|
||||
* @param row the row to append value to
|
||||
* @param value value pointer to append
|
||||
* @param pSchema schema
|
||||
* @param colIdx column index
|
||||
*
|
||||
* @return 0 for success and -1 for failure
|
||||
* Append a column value to the data row
|
||||
*/
|
||||
// int32_t tdAppendColVal(SDataRow row, void *value, SColumn *pCol, int32_t suffixOffset) {
|
||||
// int32_t offset;
|
||||
int tdAppendColVal(SDataRow row, void *value, STColumn *pCol) {
|
||||
switch (colType(pCol))
|
||||
{
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
*(int32_t *)dataRowAt(row, dataRowFLen(row)) = dataRowLen(row);
|
||||
dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
|
||||
memcpy((void *)dataRowAt(row, dataRowLen(row)), value, strlen(value));
|
||||
dataRowLen(row) += strlen(value);
|
||||
break;
|
||||
default:
|
||||
memcpy(dataRowAt(row, dataRowFLen(row)), value, TYPE_BYTES[colType(pCol)]);
|
||||
dataRowFLen(row) += TYPE_BYTES[colType(pCol)];
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
// switch (pCol->type) {
|
||||
// case TD_DATATYPE_BOOL:
|
||||
// case TD_DATATYPE_TINYINT:
|
||||
// case TD_DATATYPE_SMALLINT:
|
||||
// case TD_DATATYPE_INT:
|
||||
// case TD_DATATYPE_BIGINT:
|
||||
// case TD_DATATYPE_FLOAT:
|
||||
// case TD_DATATYPE_DOUBLE:
|
||||
// case TD_DATATYPE_TIMESTAMP:
|
||||
// memcpy(dataRowIdx(row, pCol->offset + sizeof(int32_t)), value, rowDataLen[pCol->type]);
|
||||
// if (dataRowLen(row) < suffixOffset + sizeof(int32_t))
|
||||
// dataRowSetLen(row, dataRowLen(row) + rowDataLen[pCol->type]);
|
||||
// break;
|
||||
// case TD_DATATYPE_VARCHAR:
|
||||
// offset = dataRowLen(row) > suffixOffset ? dataRowLen(row) : suffixOffset;
|
||||
// memcpy(dataRowIdx(row, pCol->offset+sizeof(int32_t)), (void *)(&offset), sizeof(offset));
|
||||
// case TD_DATATYPE_NCHAR:
|
||||
// case TD_DATATYPE_BINARY:
|
||||
// break;
|
||||
// default:
|
||||
// return -1;
|
||||
// }
|
||||
void tdDataRowReset(SDataRow row, STSchema *pSchema) { tdInitDataRow(row, pSchema); }
|
||||
|
||||
// return 0;
|
||||
// }
|
||||
|
||||
/**
|
||||
* Copy a data row to a destination
|
||||
* ASSUMPTIONS: dst has enough room for a copy of row
|
||||
*/
|
||||
void tdDataRowCpy(void *dst, SDataRow row) { memcpy(dst, row, dataRowLen(row)); }
|
||||
void tdDataRowReset(SDataRow row) { dataRowSetLen(row, sizeof(int32_t)); }
|
||||
SDataRow tdDataRowDup(SDataRow row) {
|
||||
SDataRow trow = tdNewDataRow(dataRowLen(row));
|
||||
SDataRow trow = malloc(dataRowLen(row));
|
||||
if (trow == NULL) return NULL;
|
||||
|
||||
dataRowCpy(trow, row);
|
||||
return row;
|
||||
return trow;
|
||||
}
|
||||
|
||||
void tdDataRowsAppendRow(SDataRows rows, SDataRow row) {
|
||||
tdDataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
|
||||
dataRowCpy((void *)((char *)rows + dataRowsLen(rows)), row);
|
||||
dataRowsSetLen(rows, dataRowsLen(rows) + dataRowLen(row));
|
||||
}
|
||||
|
||||
|
@ -300,4 +278,17 @@ SDataRow tdDataRowsNext(SDataRowsIter *pIter) {
|
|||
}
|
||||
|
||||
return row;
|
||||
}
|
||||
|
||||
/**
|
||||
* Return the first part length of a data row for a schema
|
||||
*/
|
||||
static int tdFLenFromSchema(STSchema *pSchema) {
|
||||
int ret = 0;
|
||||
for (int i = 0; i < schemaNCols(pSchema); i++) {
|
||||
STColumn *pCol = schemaColAt(pSchema, i);
|
||||
ret += TYPE_BYTES[pCol->type];
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
|
@ -96,7 +96,7 @@ typedef struct {
|
|||
STableId tableId;
|
||||
int32_t padding; // TODO just for padding here
|
||||
int32_t sversion; // data schema version
|
||||
int32_t len; // message length
|
||||
int32_t len; // data part length, not including the SSubmitBlk head
|
||||
char data[];
|
||||
} SSubmitBlk;
|
||||
|
||||
|
|
|
@ -621,7 +621,7 @@ static int32_t tdInsertRowToTable(STsdbRepo *pRepo, SDataRow row, STable *pTable
|
|||
}
|
||||
|
||||
pNode->level = level;
|
||||
tdDataRowCpy(SL_GET_NODE_DATA(pNode), row);
|
||||
dataRowCpy(SL_GET_NODE_DATA(pNode), row);
|
||||
|
||||
// Insert the skiplist node into the data
|
||||
tsdbInsertRowToTableImpl(pNode, pTable);
|
||||
|
|
|
@ -32,15 +32,29 @@ TEST(TsdbTest, createRepo) {
|
|||
|
||||
tsdbCreateTable(pRepo, &tCfg);
|
||||
|
||||
// 3. Loop to write some simple data
|
||||
// int size = tdMaxRowBytesFromSchema(schema);
|
||||
// int nrows = 100;
|
||||
// SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk+ size * nrows);
|
||||
// // 3. Loop to write some simple data
|
||||
// int nRows = 10;
|
||||
// SSubmitMsg *pMsg = (SSubmitMsg *)malloc(sizeof(SSubmitMsg) + sizeof(SSubmitBlk) + tdMaxRowBytesFromSchema(schema) * nRows);
|
||||
|
||||
// {
|
||||
// // TODO
|
||||
// SSubmitBlk *pBlock = pMsg->blocks;
|
||||
// pBlock->tableId = {.uid = 987607499877672L, .tid = 0};
|
||||
// pBlock->sversion = 0;
|
||||
// pBlock->len = 0;
|
||||
// int64_t start_time = 1584081000000;
|
||||
// for (int i = 0; i < nRows; i++) {
|
||||
// int64_t ttime = start_time + 1000 * i;
|
||||
// SDataRow row = (SDataRow)(pBlock->data + pBlock->len);
|
||||
// dataRowInit(row);
|
||||
|
||||
// for (int j; j < schemaNCols(schema); j++) {
|
||||
// if (j == 0) { // Just for timestamp
|
||||
// tdAppendColVal(row, (void *)(&time), schemaColAt(schema, i), );
|
||||
// } else { // For int
|
||||
|
||||
// }
|
||||
// }
|
||||
|
||||
// pBlock->len += dataRowLen(row);
|
||||
// }
|
||||
|
||||
// tsdbInsertData(pRepo, pMsg);
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue