diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 528e9b2825..1d3319e61d 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -19,6 +19,7 @@ #include #include +#include "talgo.h" #include "taosdef.h" #include "tutil.h" @@ -26,19 +27,24 @@ extern "C" { #endif -#define STR_TO_VARSTR(x, str) do {VarDataLenT __len = strlen(str); \ - *(VarDataLenT*)(x) = __len; \ - strncpy(varDataVal(x), (str), __len);} while(0); +#define STR_TO_VARSTR(x, str) \ + do { \ + VarDataLenT __len = strlen(str); \ + *(VarDataLenT *)(x) = __len; \ + strncpy(varDataVal(x), (str), __len); \ + } while (0); -#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) do {\ - char* _e = stpncpy(varDataVal(x), (str), (_maxs));\ - varDataSetLen(x, (_e - (x) - VARSTR_HEADER_SIZE));\ -} while(0) +#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ + do { \ + char *_e = stpncpy(varDataVal(x), (str), (_maxs)); \ + varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \ + } while (0) -#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) do {\ - *(VarDataLenT*)(x) = (_size); \ - strncpy(varDataVal(x), (str), (_size));\ -} while(0); +#define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ + do { \ + *(VarDataLenT *)(x) = (_size); \ + strncpy(varDataVal(x), (str), (_size)); \ + } while (0); // ----------------- TSDB COLUMN DEFINITION typedef struct { @@ -72,15 +78,31 @@ typedef struct { #define schemaTLen(s) ((s)->tlen) #define schemaFLen(s) ((s)->flen) #define schemaColAt(s, i) ((s)->columns + i) +#define tdFreeSchema(s) tfree((s)) STSchema *tdNewSchema(int32_t nCols); -#define tdFreeSchema(s) tfree((s)) int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes); STSchema *tdDupSchema(STSchema *pSchema); int tdGetSchemaEncodeSize(STSchema *pSchema); void * tdEncodeSchema(void *dst, STSchema *pSchema); STSchema *tdDecodeSchema(void **psrc); +static FORCE_INLINE int comparColId(const void *key1, const void *key2) { + if (*(int16_t *)key1 > ((STColumn *)key2)->colId) { + return 1; + } else if (*(int16_t *)key1 < ((STColumn *)key2)->colId) { + return -1; + } else { + return 0; + } +} + +static FORCE_INLINE STColumn *tdGetColOfID(STSchema *pSchema, int16_t colId) { + void *ptr = bsearch(&colId, (void *)pSchema->columns, schemaNCols(pSchema), sizeof(STColumn), comparColId); + if (ptr == NULL) return NULL; + return (STColumn *)ptr; +} + // ----------------- Data row structure /* A data row, the format is like below: @@ -188,12 +210,11 @@ static FORCE_INLINE int32_t dataColGetNEleLen(SDataCol *pDataCol, int rows) { } } - typedef struct { - int maxRowSize; - int maxCols; // max number of columns - int maxPoints; // max number of points - int bufSize; + int maxRowSize; + int maxCols; // max number of columns + int maxPoints; // max number of points + int bufSize; int numOfRows; int numOfCols; // Total number of cols @@ -213,62 +234,102 @@ void tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); void tdFreeDataCols(SDataCols *pCols); void tdAppendDataRowToDataCol(SDataRow row, SDataCols *pCols); -void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! +void tdPopDataColsPoints(SDataCols *pCols, int pointsToPop); //!!!! int tdMergeDataCols(SDataCols *target, SDataCols *src, int rowsToMerge); void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCols *src2, int *iter2, int tRows); - -// ----------------- Tag row structure - -/* A tag row, the format is like below: -+----------+----------------------------------------------------------------+ -| STagRow | STagCol | STagCol | STagCol | STagCol | ...| STagCol | STagCol | -+----------+----------------------------------------------------------------+ - -pData -+----------+----------------------------------------------------------------+ -| value 1 | value 2 | value 3 | value 4 | ....|value n | -+----------+----------------------------------------------------------------+ - +// ----------------- K-V data row structure +/* + * +----------+----------+---------------------------------+---------------------------------+ + * | int16_t | int16_t | | | + * +----------+----------+---------------------------------+---------------------------------+ + * | len | ncols | cols index | data part | + * +----------+----------+---------------------------------+---------------------------------+ */ - - -#define TD_TAG_ROW_HEAD_SIZE sizeof(int16_t) - -#define tagRowNum(r) (*(int16_t *)(r)) -#define tagRowArray(r) POINTER_SHIFT(r, TD_TAG_ROW_HEAD_SIZE) -//#define dataRowKey(r) (*(TSKEY *)(dataRowTuple(r))) -//#define dataRowSetLen(r, l) (dataRowLen(r) = (l)) -//#define dataRowCpy(dst, r) memcpy((dst), (r), dataRowLen(r)) -//#define dataRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_DATA_ROW_HEAD_SIZE) +typedef void *SKVRow; typedef struct { - int16_t colId; // column ID - int16_t colType; - uint16_t offset; //to store value for numeric col or offset for binary/Nchar -} STagCol; + int16_t colId; + int16_t offset; +} SColIdx; +#define TD_KV_ROW_HEAD_SIZE 2 * sizeof(int16_t) + +#define kvRowLen(r) (*(int16_t *)(r)) +#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) +#define kvRowSetLen(r, len) kvRowLen(r) = (len) +#define kvRowSetNCols(r, n) kvRowNCols(r) = (n) +#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE) +#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r)) +#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r)) +#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset) +#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i)) +#define kvRowFree(r) tfree(r) + +SKVRow tdKVRowDup(SKVRow row); +SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value); +void * tdEncodeKVRow(void *buf, SKVRow row); +void * tdDecodeKVRow(void *buf, SKVRow *row); + +static FORCE_INLINE int comparTagId(const void *key1, const void *key2) { + if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) { + return 1; + } else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) { + return -1; + } else { + return 0; + } +} + +static FORCE_INLINE void *tdGetKVRowValOfCol(SKVRow row, int16_t colId) { + void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ); + if (ret == NULL) return NULL; + return kvRowColVal(row, (SColIdx *)ret); +} + +// ----------------- K-V data row builder typedef struct { - int32_t len; - void * pData; // Space to store the tag value - uint16_t dataLen; - int16_t ncols; // Total columns allocated - STagCol tagCols[]; -} STagRow; + int16_t tCols; + int16_t nCols; + SColIdx *pColIdx; + int16_t alloc; + int16_t size; + void * buf; +} SKVRowBuilder; +int tdInitKVRowBuilder(SKVRowBuilder *pBuilder); +void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder); +void tdResetKVRowBuilder(SKVRowBuilder *pBuilder); +SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); -#define tagColSize(r) (sizeof(STagCol) + r.colLen) +static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { + ASSERT(pBuilder->nCols == 0 || colId > pBuilder->pColIdx[pBuilder->nCols - 1].colId); -int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId); //insert tag value and update all the information -int tdDeleteTagCol(SDataRow row, int16_t colId); // delete tag value and update all the information -void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type); //if find tag, 0, else return -1; -int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId); -SDataRow tdTagRowDup(SDataRow row); -void tdFreeTagRow(SDataRow row); -SDataRow tdTagRowDecode(SDataRow row); -int tdTagRowCpy(SDataRow dst, SDataRow src); -void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags); -STSchema *tdGetSchemaFromData(SDataRow *row); + if (pBuilder->nCols >= pBuilder->tCols) { + pBuilder->tCols *= 2; + pBuilder->pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + } + + pBuilder->pColIdx[pBuilder->nCols].colId = colId; + pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size; + + pBuilder->nCols++; + + int tlen = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; + if (tlen > pBuilder->alloc - pBuilder->size) { + while (tlen > pBuilder->alloc - pBuilder->size) { + pBuilder->alloc *= 2; + } + pBuilder->buf = realloc(pBuilder->buf, pBuilder->alloc); + if (pBuilder->buf == NULL) return -1; + } + + memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen); + pBuilder->size += tlen; + + return 0; +} #ifdef __cplusplus } diff --git a/src/common/src/tdataformat.c b/src/common/src/tdataformat.c index e7935900d6..7a35d5fb69 100644 --- a/src/common/src/tdataformat.c +++ b/src/common/src/tdataformat.c @@ -13,8 +13,8 @@ * along with this program. If not, see . */ #include "tdataformat.h" -#include "wchar.h" #include "talgo.h" +#include "wchar.h" /** * Create a SSchema object with nCols columns @@ -51,13 +51,13 @@ int tdSchemaAddCol(STSchema *pSchema, int8_t type, int16_t colId, int32_t bytes) if (schemaNCols(pSchema) == 0) { colSetOffset(pCol, 0); } else { - STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema)-1); + STColumn *pTCol = schemaColAt(pSchema, schemaNCols(pSchema) - 1); colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); } switch (type) { case TSDB_DATA_TYPE_BINARY: case TSDB_DATA_TYPE_NCHAR: - colSetBytes(pCol, bytes); // Set as maximum bytes + colSetBytes(pCol, bytes); // Set as maximum bytes pSchema->tlen += (TYPE_BYTES[type] + sizeof(VarDataLenT) + bytes); break; default: @@ -152,152 +152,6 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { return row; } -int tdSetTagCol(SDataRow row, void *value, int16_t len, int8_t type, int16_t colId){ //insert/update tag value and update all the information - ASSERT(((STagRow *)row)->pData != NULL); - //STagCol * stCol = tdQueryTagColByID() - - return 0; -}; - -int tdDeleteTagCol(SDataRow row, int16_t colId){ // delete tag value and update all the information - //todo - return 0; -}; - -static int compTagId(const void *key1, const void *key2) { - if (((STagCol *)key1)->colId > ((STagCol *)key2)->colId) { - return 1; - } else if (((STagCol *)key1)->colId == ((STagCol *)key2)->colId) { - return 0; - } else { - return -1; - } -} - -/** - * Find tag structure by colId, if find, return tag structure, else return NULL; - */ -STagCol * tdQueryTagColByID(SDataRow row, int16_t colId, int flags) { //if find tag, 0, else return -1; - ASSERT(((STagRow *)row)->pData != NULL); - STagCol *pBase = ((STagRow *)row)->tagCols; - int16_t nCols = ((STagRow *)row)->ncols; - STagCol key = {colId,0,0}; - STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, flags); - return stCol; -}; - -/** -* Find tag value by colId, if find, return tag value, else return NULL; -*/ -void * tdQueryTagByID(SDataRow row, int16_t colId, int16_t *type) { - ASSERT(((STagRow *)row)->pData != NULL); - STagCol *pBase = ((STagRow *)row)->tagCols; - int16_t nCols = ((STagRow *)row)->ncols; - STagCol key = {colId,0,0}; - STagCol * stCol = taosbsearch(&key, pBase, nCols, sizeof(STagCol), compTagId, TD_EQ); - if (NULL == stCol) { - type = TSDB_DATA_TYPE_NULL; - return NULL; - } - - void * pData = ((STagRow *)row)->pData; - *type = stCol->colType; - - return pData + stCol->offset; -}; - -int tdAppendTagColVal(SDataRow row, void *value, int8_t type, int32_t bytes, int16_t colId){ - ASSERT(value != NULL); - //ASSERT(bytes-2 == varDataTLen(value)); - ASSERT(row != NULL); - STagRow *pTagrow = row; - pTagrow->tagCols[pTagrow->ncols].colId = colId; - pTagrow->tagCols[pTagrow->ncols].colType = type; - pTagrow->tagCols[pTagrow->ncols].offset = pTagrow->dataLen; - - switch (type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: - memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, varDataTLen(value)); - pTagrow->dataLen += varDataTLen(value); - break; - default: - memcpy((char *)pTagrow->pData + pTagrow->dataLen, value, TYPE_BYTES[type]); - pTagrow->dataLen += TYPE_BYTES[type]; - break; - } - - pTagrow->ncols++; - - return 0; -}; - -void * tdNewTagRowFromSchema(STSchema *pSchema, int16_t numofTags) { - int32_t size = sizeof(STagRow) + numofTags * sizeof(STagCol); - - STagRow *row = malloc(size); - if (row == NULL) return NULL; - - int32_t datasize = pSchema->tlen; - row->pData = malloc(datasize); - if (NULL == row->pData) { - free(row); - return NULL; - } - - row->len = size; - row->dataLen = 0; - row->ncols = 0; - return row; -} -/** - * free tag row - */ - -void tdFreeTagRow(SDataRow row) { - if (row) { - free(((STagRow *)row)->pData); - free(row); - } -} - -SDataRow tdTagRowDup(SDataRow row) { - STagRow *trow = malloc(dataRowLen(row)); - if (trow == NULL) return NULL; - - dataRowCpy(trow, row); - trow->pData = malloc(trow->dataLen); - if (NULL == trow->pData) { - free(trow); - return NULL; - } - memcpy(trow->pData, ((STagRow *)row)->pData, trow->dataLen); - return trow; -} - -SDataRow tdTagRowDecode(SDataRow row) { - STagRow *trow = malloc(dataRowLen(row)); - if (trow == NULL) return NULL; - - dataRowCpy(trow, row); - trow->pData = malloc(trow->dataLen); - if (NULL == trow->pData) { - free(trow); - return NULL; - } - char * pData = (char *)row + dataRowLen(row); - memcpy(trow->pData, pData, trow->dataLen); - return trow; -} - -int tdTagRowCpy(SDataRow dst, SDataRow src) { - if (src == NULL) return -1; - - dataRowCpy(dst, src); - void * pData = dst + dataRowLen(src); - memcpy(pData, ((STagRow *)src)->pData, ((STagRow *)src)->dataLen); - return 0; -} /** * Free the SDataRow object */ @@ -331,7 +185,6 @@ void dataColInit(SDataCol *pDataCol, STColumn *pCol, void **pBuf, int maxPoints) pDataCol->pData = *pBuf; *pBuf = POINTER_SHIFT(*pBuf, pDataCol->spaceSize); } - } void dataColAppendVal(SDataCol *pCol, void *value, int numOfRows, int maxPoints) { @@ -415,7 +268,7 @@ void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints) { void dataColSetOffset(SDataCol *pCol, int nEle) { ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); - void * tptr = pCol->pData; + void *tptr = pCol->pData; // char *tptr = (char *)(pCol->pData); VarDataOffsetT offset = 0; @@ -595,4 +448,131 @@ void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, SDataCol (*iter2)++; } } +} + +SKVRow tdKVRowDup(SKVRow row) { + SKVRow trow = malloc(kvRowLen(row)); + if (trow == NULL) return NULL; + + kvRowCpy(trow, row); + return trow; +} + +SKVRow tdSetKVRowDataOfCol(SKVRow row, int16_t colId, int8_t type, void *value) { + // TODO + return NULL; + // SColIdx *pColIdx = NULL; + // SKVRow rrow = row; + // SKVRow nrow = NULL; + // void *ptr = taosbsearch(&colId, kvDataRowColIdx(row), kvDataRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE); + + // if (ptr == NULL || ((SColIdx *)ptr)->colId < colId) { // need to add a column value to the row + // int tlen = kvDataRowLen(row) + sizeof(SColIdx) + (IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : + // TYPE_BYTES[type]); nrow = malloc(tlen); if (nrow == NULL) return NULL; + + // kvDataRowSetNCols(nrow, kvDataRowNCols(row)+1); + // kvDataRowSetLen(nrow, tlen); + + // if (ptr == NULL) ptr = kvDataRowValues(row); + + // // Copy the columns before the col + // if (POINTER_DISTANCE(ptr, kvDataRowColIdx(row)) > 0) { + // memcpy(kvDataRowColIdx(nrow), kvDataRowColIdx(row), POINTER_DISTANCE(ptr, kvDataRowColIdx(row))); + // memcpy(kvDataRowValues(nrow), kvDataRowValues(row), ((SColIdx *)ptr)->offset); // TODO: here is not correct + // } + + // // Set the new col value + // pColIdx = (SColIdx *)POINTER_SHIFT(nrow, POINTER_DISTANCE(ptr, row)); + // pColIdx->colId = colId; + // pColIdx->offset = ((SColIdx *)ptr)->offset; // TODO: here is not correct + + // if (IS_VAR_DATA_TYPE(type)) { + // memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, varDataLen(value)); + // } else { + // memcpy(POINTER_SHIFT(kvDataRowValues(nrow), pColIdx->offset), value, TYPE_BYTES[type]); + // } + + // // Copy the columns after the col + // if (POINTER_DISTANCE(kvDataRowValues(row), ptr) > 0) { + // // TODO: memcpy(); + // } + // } else { + // // TODO + // ASSERT(((SColIdx *)ptr)->colId == colId); + // if (IS_VAR_DATA_TYPE(type)) { + // void *pOldVal = kvDataRowColVal(row, (SColIdx *)ptr); + + // if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place + // memcpy(pOldVal, value, varDataTLen(value)); + // } else { // enlarge the memory + // // rrow = realloc(rrow, kvDataRowLen(rrow) + varDataTLen(value) - varDataTLen(pOldVal)); + // // if (rrow == NULL) return NULL; + // // memmove(); + // // for () { + // // ((SColIdx *)ptr)->offset += balabala; + // // } + + // // kvDataRowSetLen(); + + // } + // } else { + // memcpy(kvDataRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]); + // } + // } + + // return rrow; +} + +void *tdEncodeKVRow(void *buf, SKVRow row) { + // May change the encode purpose + kvRowCpy(buf, row); + return POINTER_SHIFT(buf, kvRowLen(row)); +} + +void *tdDecodeKVRow(void *buf, SKVRow *row) { + *row = tdKVRowDup(buf); + return POINTER_SHIFT(buf, kvRowLen(*row)); +} + +int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) { + pBuilder->tCols = 128; + pBuilder->nCols = 0; + pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + pBuilder->alloc = 1024; + pBuilder->size = 0; + pBuilder->buf = malloc(pBuilder->alloc); + if (pBuilder->buf == NULL) { + free(pBuilder->pColIdx); + return -1; + } + return 0; +} + +void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) { + tfree(pBuilder->pColIdx); + tfree(pBuilder->buf); +} + +void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) { + pBuilder->nCols = 0; + pBuilder->size = 0; +} + +SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { + int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; + if (tlen == 0) return NULL; + + tlen += TD_KV_ROW_HEAD_SIZE; + + SKVRow row = malloc(tlen); + if (row == NULL) return NULL; + + kvRowSetNCols(row, pBuilder->nCols); + kvRowSetLen(row, tlen); + + memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); + memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); + + return row; } \ No newline at end of file diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index bdf61208a3..2aca057ba7 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -52,6 +52,7 @@ typedef struct tstr { #define varDataCopy(dst, v) memcpy((dst), (void*) (v), varDataTLen(v)) #define varDataLenByData(v) (*(VarDataLenT *)(((char*)(v)) - VARSTR_HEADER_SIZE)) #define varDataSetLen(v, _len) (((VarDataLenT *)(v))[0] = (VarDataLenT) (_len)) +#define IS_VAR_DATA_TYPE(t) (((t) == TSDB_DATA_TYPE_BINARY) || ((t) == TSDB_DATA_TYPE_NCHAR)) // this data type is internally used only in 'in' query to hold the values #define TSDB_DATA_TYPE_ARRAY (TSDB_DATA_TYPE_NCHAR + 1) diff --git a/src/inc/tsdb.h b/src/inc/tsdb.h index f291da4665..3680a46d85 100644 --- a/src/inc/tsdb.h +++ b/src/inc/tsdb.h @@ -102,14 +102,15 @@ int tsdbInitTableCfg(STableCfg *config, ETableType type, uint64_t uid, int32_t int tsdbTableSetSuperUid(STableCfg *config, uint64_t uid); int tsdbTableSetSchema(STableCfg *config, STSchema *pSchema, bool dup); int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup); -int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup); +int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup); int tsdbTableSetName(STableCfg *config, char *name, bool dup); int tsdbTableSetSName(STableCfg *config, char *sname, bool dup); int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup); void tsdbClearTableCfg(STableCfg *config); -int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId* id, int32_t colId, int16_t *type, int16_t *bytes, char **val); -char* tsdbGetTableName(TsdbRepoT *repo, const STableId* id, int16_t* bytes); +int32_t tsdbGetTableTagVal(TsdbRepoT *repo, STableId *id, int32_t colId, int16_t *type, int16_t *bytes, char **val); +char * tsdbGetTableName(TsdbRepoT *repo, const STableId *id, int16_t *bytes); +STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg); int tsdbCreateTable(TsdbRepoT *repo, STableCfg *pCfg); int tsdbDropTable(TsdbRepoT *pRepo, STableId tableId); diff --git a/src/tsdb/inc/tsdbMain.h b/src/tsdb/inc/tsdbMain.h index 9dd5136c95..e1b85ae99d 100644 --- a/src/tsdb/inc/tsdbMain.h +++ b/src/tsdb/inc/tsdbMain.h @@ -76,7 +76,7 @@ typedef struct STable { int32_t sversion; STSchema * schema; STSchema * tagSchema; - SDataRow tagVal; + SKVRow tagVal; SMemTable * mem; SMemTable * imem; void * pIndex; // For TSDB_SUPER_TABLE, it is the skiplist index diff --git a/src/tsdb/src/tsdbFile.c b/src/tsdb/src/tsdbFile.c index b07f6eed7f..413b4cff13 100644 --- a/src/tsdb/src/tsdbFile.c +++ b/src/tsdb/src/tsdbFile.c @@ -94,7 +94,7 @@ static int tsdbInitFile(char *dataDir, int fid, const char *suffix, SFile *pFile if (!taosCheckChecksumWhole((uint8_t *)buf, TSDB_FILE_HEAD_SIZE)) return -1; void *pBuf = buf; - pBuf = taosDecodeFixed32(pBuf, &version); + pBuf = taosDecodeFixedU32(pBuf, &version); pBuf = tsdbDecodeSFileInfo(pBuf, &(pFile->info)); tsdbCloseFile(pFile); diff --git a/src/tsdb/src/tsdbMain.c b/src/tsdb/src/tsdbMain.c index 197718e154..e79873d1b9 100644 --- a/src/tsdb/src/tsdbMain.c +++ b/src/tsdb/src/tsdbMain.c @@ -510,11 +510,11 @@ int tsdbTableSetTagSchema(STableCfg *config, STSchema *pSchema, bool dup) { return 0; } -int tsdbTableSetTagValue(STableCfg *config, SDataRow row, bool dup) { +int tsdbTableSetTagValue(STableCfg *config, SKVRow row, bool dup) { if (config->type != TSDB_CHILD_TABLE) return -1; if (dup) { - config->tagValues = tdDataRowDup(row); + config->tagValues = tdKVRowDup(row); } else { config->tagValues = row; } @@ -561,7 +561,7 @@ int tsdbTableSetStreamSql(STableCfg *config, char *sql, bool dup) { void tsdbClearTableCfg(STableCfg *config) { if (config->schema) tdFreeSchema(config->schema); if (config->tagSchema) tdFreeSchema(config->tagSchema); - if (config->tagValues) tdFreeDataRow(config->tagValues); + if (config->tagValues) kvRowFree(config->tagValues); tfree(config->name); tfree(config->sname); tfree(config->sql); diff --git a/src/tsdb/src/tsdbMeta.c b/src/tsdb/src/tsdbMeta.c index 03394409a7..8d70789b67 100644 --- a/src/tsdb/src/tsdbMeta.c +++ b/src/tsdb/src/tsdbMeta.c @@ -47,8 +47,7 @@ void tsdbEncodeTable(STable *pTable, char *buf, int *contLen) { ptr = tdEncodeSchema(ptr, pTable->schema); ptr = tdEncodeSchema(ptr, pTable->tagSchema); } else if (pTable->type == TSDB_CHILD_TABLE) { - tdTagRowCpy(ptr, pTable->tagVal); - ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen); + ptr = tdEncodeKVRow(ptr, pTable->tagVal); } else { ptr = tdEncodeSchema(ptr, pTable->schema); } @@ -94,8 +93,7 @@ STable *tsdbDecodeTable(void *cont, int contLen) { pTable->schema = tdDecodeSchema(&ptr); pTable->tagSchema = tdDecodeSchema(&ptr); } else if (pTable->type == TSDB_CHILD_TABLE) { - pTable->tagVal = tdTagRowDecode(ptr); - ptr = POINTER_SHIFT(ptr, dataRowLen(pTable->tagVal) + ((STagRow *)pTable->tagVal)->dataLen); + ptr = tdDecodeKVRow(ptr, &pTable->tagVal); } else { pTable->schema = tdDecodeSchema(&ptr); } @@ -115,12 +113,9 @@ void tsdbFreeEncode(void *cont) { static char* getTagIndexKey(const void* pData) { STableIndexElem* elem = (STableIndexElem*) pData; - SDataRow row = elem->pTable->tagVal; STSchema* pSchema = tsdbGetTableTagSchema(elem->pMeta, elem->pTable); STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; - int16_t type = 0; - void * res = tdQueryTagByID(row, pCol->colId, &type); - ASSERT(type == pCol->type); + void * res = tdGetKVRowValOfCol(elem->pTable->tagVal, pCol->colId); return res; } @@ -255,19 +250,24 @@ STSchema * tsdbGetTableTagSchema(STsdbMeta *pMeta, STable *pTable) { int32_t tsdbGetTableTagVal(TsdbRepoT* repo, STableId* id, int32_t colId, int16_t* type, int16_t* bytes, char** val) { STsdbMeta* pMeta = tsdbGetMeta(repo); STable* pTable = tsdbGetTableByUid(pMeta, id->uid); + + STSchema *pSchema = tsdbGetTableTagSchema(pMeta, pTable); + STColumn *pCol = tdGetColOfID(pSchema, colId); + if (pCol == NULL) { + return -1; // No matched tag volumn + } - *val = tdQueryTagByID(pTable->tagVal, colId, type); + *val = tdGetKVRowValOfCol(pTable->tagVal, colId); + *type = pCol->type; if (*val != NULL) { - switch(*type) { - case TSDB_DATA_TYPE_BINARY: - case TSDB_DATA_TYPE_NCHAR: *bytes = varDataLen(*val); break; - case TSDB_DATA_TYPE_NULL: *bytes = 0; break; - default: - *bytes = tDataTypeDesc[*type].nSize;break; + if (IS_VAR_DATA_TYPE(*type)) { + *bytes = varDataLen(*val); + } else { + *bytes = TYPE_BYTES[*type]; } } - + return TSDB_CODE_SUCCESS; } @@ -341,7 +341,7 @@ static STable *tsdbNewTable(STableCfg *pCfg, bool isSuper) { if (pCfg->type == TSDB_CHILD_TABLE) { pTable->superUid = pCfg->superUid; - pTable->tagVal = tdDataRowDup(pCfg->tagValues); + pTable->tagVal = tdKVRowDup(pCfg->tagValues); } else if (pCfg->type == TSDB_NORMAL_TABLE) { pTable->superUid = -1; pTable->schema = tdDupSchema(pCfg->schema); @@ -438,6 +438,60 @@ STable *tsdbIsValidTableToInsert(STsdbMeta *pMeta, STableId tableId) { return pTable; } +STableCfg *tsdbCreateTableCfgFromMsg(SMDCreateTableMsg *pMsg) { + if (pMsg == NULL) return NULL; + SSchema *pSchema = (SSchema *)pMsg->data; + int16_t numOfCols = htons(pMsg->numOfColumns); + int16_t numOfTags = htons(pMsg->numOfTags); + + STableCfg *pCfg = (STableCfg *)calloc(1, sizeof(STableCfg)); + if (pCfg == NULL) return NULL; + + if (tsdbInitTableCfg(pCfg, pMsg->tableType, htobe64(pMsg->uid), htonl(pMsg->sid)) < 0) goto _err; + STSchema *pDSchema = tdNewSchema(numOfCols); + if (pDSchema == NULL) goto _err; + for (int i = 0; i < numOfCols; i++) { + tdSchemaAddCol(pDSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + } + if (tsdbTableSetSchema(pCfg, pDSchema, false) < 0) goto _err; + if (tsdbTableSetName(pCfg, pMsg->tableId, true) < 0) goto _err; + + if (numOfTags > 0) { + STSchema *pTSchema = tdNewSchema(numOfTags); + for (int i = numOfCols; i < numOfCols + numOfTags; i++) { + tdSchemaAddCol(pTSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); + } + if (tsdbTableSetTagSchema(pCfg, pTSchema, false) < 0) goto _err; + if (tsdbTableSetSName(pCfg, pMsg->superTableId, true) < 0) goto _err; + if (tsdbTableSetSuperUid(pCfg, htobe64(pMsg->superTableUid)) < 0) goto _err; + + char * pTagData = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); + int accBytes = 0; + SKVRowBuilder kvRowBuilder; + + if (tdInitKVRowBuilder(&kvRowBuilder) < 0) goto _err; + for (int i = 0; i < numOfTags; i++) { + STColumn *pCol = schemaColAt(pTSchema, i); + tdAddColToKVRow(&kvRowBuilder, pCol->colId, pCol->type, pTagData + accBytes); + accBytes += htons(pSchema[i+numOfCols].bytes); + } + tsdbTableSetTagValue(pCfg, tdGetKVRowFromBuilder(&kvRowBuilder), false); + tdDestroyKVRowBuilder(&kvRowBuilder); + } + + if (pMsg->tableType == TSDB_STREAM_TABLE) { + char *sql = pMsg->data + (numOfCols + numOfTags) * sizeof(SSchema); + tsdbTableSetStreamSql(pCfg, sql, true); + } + + return pCfg; + +_err: + tsdbClearTableCfg(pCfg); + tfree(pCfg); + return NULL; +} + // int32_t tsdbDropTableImpl(STsdbMeta *pMeta, STableId tableId) { int tsdbDropTable(TsdbRepoT *repo, STableId tableId) { STsdbRepo *pRepo = (STsdbRepo *)repo; @@ -478,7 +532,7 @@ static int tsdbFreeTable(STable *pTable) { if (pTable == NULL) return 0; if (pTable->type == TSDB_CHILD_TABLE) { - tdFreeTagRow(pTable->tagVal); + kvRowFree(pTable->tagVal); } else { tdFreeSchema(pTable->schema); } @@ -627,9 +681,7 @@ static int tsdbRemoveTableFromIndex(STsdbMeta *pMeta, STable *pTable) { STSchema* pSchema = tsdbGetTableTagSchema(pMeta, pTable); STColumn* pCol = &pSchema->columns[DEFAULT_TAG_INDEX_COLUMN]; - int16_t tagtype = 0; - char* key = tdQueryTagByID(pTable->tagVal, pCol->colId, &tagtype); - ASSERT(pCol->type == tagtype); + char* key = tdGetKVRowValOfCol(pTable->tagVal, pCol->colId); SArray* res = tSkipListGet(pSTable->pIndex, key); size_t size = taosArrayGetSize(res); diff --git a/src/tsdb/src/tsdbRWHelper.c b/src/tsdb/src/tsdbRWHelper.c index eeb212fe95..5bdf37c81e 100644 --- a/src/tsdb/src/tsdbRWHelper.c +++ b/src/tsdb/src/tsdbRWHelper.c @@ -237,20 +237,24 @@ int tsdbSetAndOpenHelperFile(SRWHelper *pHelper, SFileGroup *pGroup) { int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { if (pHelper->files.headF.fd > 0) { + fsync(pHelper->files.headF.fd); close(pHelper->files.headF.fd); pHelper->files.headF.fd = -1; } if (pHelper->files.dataF.fd > 0) { if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.dataF), 0); + fsync(pHelper->files.dataF.fd); close(pHelper->files.dataF.fd); pHelper->files.dataF.fd = -1; } if (pHelper->files.lastF.fd > 0) { + fsync(pHelper->files.lastF.fd); close(pHelper->files.lastF.fd); pHelper->files.lastF.fd = -1; } if (pHelper->files.nHeadF.fd > 0) { if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nHeadF), 0); + fsync(pHelper->files.nHeadF.fd); close(pHelper->files.nHeadF.fd); pHelper->files.nHeadF.fd = -1; if (hasError) { @@ -263,6 +267,7 @@ int tsdbCloseHelperFile(SRWHelper *pHelper, bool hasError) { if (pHelper->files.nLastF.fd > 0) { if (!hasError) tsdbUpdateFileHeader(&(pHelper->files.nLastF), 0); + fsync(pHelper->files.nLastF.fd); close(pHelper->files.nLastF.fd); pHelper->files.nLastF.fd = -1; if (hasError) { @@ -448,7 +453,7 @@ int tsdbWriteCompIdx(SRWHelper *pHelper) { pHelper->pBuffer = trealloc(pHelper->pBuffer, tsizeof(pHelper->pBuffer)*2); } buf = POINTER_SHIFT(pHelper->pBuffer, drift); - buf = taosEncodeVariant32(buf, i); + buf = taosEncodeVariantU32(buf, i); buf = tsdbEncodeSCompIdx(buf, pCompIdx); } } @@ -486,7 +491,7 @@ int tsdbLoadCompIdx(SRWHelper *pHelper, void *target) { void *ptr = pHelper->pBuffer; while (((char *)ptr - (char *)pHelper->pBuffer) < (pFile->info.len - sizeof(TSCKSUM))) { uint32_t tid = 0; - if ((ptr = taosDecodeVariant32(ptr, &tid)) == NULL) return -1; + if ((ptr = taosDecodeVariantU32(ptr, &tid)) == NULL) return -1; ASSERT(tid > 0 && tid < pHelper->config.maxTables); if ((ptr = tsdbDecodeSCompIdx(ptr, pHelper->pCompIdx + tid)) == NULL) return -1; @@ -1248,12 +1253,12 @@ static int tsdbGetRowsInRange(SDataCols *pDataCols, TSKEY minKey, TSKEY maxKey) } void *tsdbEncodeSCompIdx(void *buf, SCompIdx *pIdx) { - buf = taosEncodeVariant32(buf, pIdx->len); - buf = taosEncodeVariant32(buf, pIdx->offset); - buf = taosEncodeFixed8(buf, pIdx->hasLast); - buf = taosEncodeVariant32(buf, pIdx->numOfBlocks); - buf = taosEncodeFixed64(buf, pIdx->uid); - buf = taosEncodeFixed64(buf, pIdx->maxKey); + buf = taosEncodeVariantU32(buf, pIdx->len); + buf = taosEncodeVariantU32(buf, pIdx->offset); + buf = taosEncodeFixedU8(buf, pIdx->hasLast); + buf = taosEncodeVariantU32(buf, pIdx->numOfBlocks); + buf = taosEncodeFixedU64(buf, pIdx->uid); + buf = taosEncodeFixedU64(buf, pIdx->maxKey); return buf; } @@ -1263,15 +1268,15 @@ void *tsdbDecodeSCompIdx(void *buf, SCompIdx *pIdx) { uint32_t numOfBlocks = 0; uint64_t value = 0; - if ((buf = taosDecodeVariant32(buf, &(pIdx->len))) == NULL) return NULL; - if ((buf = taosDecodeVariant32(buf, &(pIdx->offset))) == NULL) return NULL; - if ((buf = taosDecodeFixed8(buf, &(hasLast))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pIdx->len))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(pIdx->offset))) == NULL) return NULL; + if ((buf = taosDecodeFixedU8(buf, &(hasLast))) == NULL) return NULL; pIdx->hasLast = hasLast; - if ((buf = taosDecodeVariant32(buf, &(numOfBlocks))) == NULL) return NULL; + if ((buf = taosDecodeVariantU32(buf, &(numOfBlocks))) == NULL) return NULL; pIdx->numOfBlocks = numOfBlocks; - if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; pIdx->uid = (int64_t)value; - if ((buf = taosDecodeFixed64(buf, &value)) == NULL) return NULL; + if ((buf = taosDecodeFixedU64(buf, &value)) == NULL) return NULL; pIdx->maxKey = (TSKEY)value; return buf; @@ -1281,7 +1286,7 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { char buf[TSDB_FILE_HEAD_SIZE] = "\0"; void *pBuf = (void *)buf; - pBuf = taosEncodeFixed32(pBuf, version); + pBuf = taosEncodeFixedU32(pBuf, version); pBuf = tsdbEncodeSFileInfo(pBuf, &(pFile->info)); taosCalcChecksumAppend(0, (uint8_t *)buf, TSDB_FILE_HEAD_SIZE); @@ -1295,23 +1300,23 @@ int tsdbUpdateFileHeader(SFile *pFile, uint32_t version) { void *tsdbEncodeSFileInfo(void *buf, const STsdbFileInfo *pInfo) { - buf = taosEncodeFixed32(buf, pInfo->offset); - buf = taosEncodeFixed32(buf, pInfo->len); - buf = taosEncodeFixed64(buf, pInfo->size); - buf = taosEncodeFixed64(buf, pInfo->tombSize); - buf = taosEncodeFixed32(buf, pInfo->totalBlocks); - buf = taosEncodeFixed32(buf, pInfo->totalSubBlocks); + buf = taosEncodeFixedU32(buf, pInfo->offset); + buf = taosEncodeFixedU32(buf, pInfo->len); + buf = taosEncodeFixedU64(buf, pInfo->size); + buf = taosEncodeFixedU64(buf, pInfo->tombSize); + buf = taosEncodeFixedU32(buf, pInfo->totalBlocks); + buf = taosEncodeFixedU32(buf, pInfo->totalSubBlocks); return buf; } void *tsdbDecodeSFileInfo(void *buf, STsdbFileInfo *pInfo) { - buf = taosDecodeFixed32(buf, &(pInfo->offset)); - buf = taosDecodeFixed32(buf, &(pInfo->len)); - buf = taosDecodeFixed64(buf, &(pInfo->size)); - buf = taosDecodeFixed64(buf, &(pInfo->tombSize)); - buf = taosDecodeFixed32(buf, &(pInfo->totalBlocks)); - buf = taosDecodeFixed32(buf, &(pInfo->totalSubBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->offset)); + buf = taosDecodeFixedU32(buf, &(pInfo->len)); + buf = taosDecodeFixedU64(buf, &(pInfo->size)); + buf = taosDecodeFixedU64(buf, &(pInfo->tombSize)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalBlocks)); + buf = taosDecodeFixedU32(buf, &(pInfo->totalSubBlocks)); return buf; } \ No newline at end of file diff --git a/src/tsdb/src/tsdbRead.c b/src/tsdb/src/tsdbRead.c index 1e03794045..46cc897eac 100644 --- a/src/tsdb/src/tsdbRead.c +++ b/src/tsdb/src/tsdbRead.c @@ -1909,9 +1909,8 @@ int32_t tableGroupComparFn(const void *p1, const void *p2, const void *param) { STColumn* pCol = schemaColAt(pTableGroupSupp->pTagSchema, colIndex); bytes = pCol->bytes; type = pCol->type; - int16_t tgtype1, tgtype2 = 0; - f1 = tdQueryTagByID(pTable1->tagVal, pCol->colId, &tgtype1); - f2 = tdQueryTagByID(pTable2->tagVal, pCol->colId, &tgtype2); + f1 = tdGetKVRowValOfCol(pTable1->tagVal, pCol->colId); + f2 = tdGetKVRowValOfCol(pTable2->tagVal, pCol->colId); } int32_t ret = doCompare(f1, f2, type, bytes); @@ -1999,9 +1998,7 @@ bool indexedNodeFilterFp(const void* pNode, void* param) { val = (char*) elem->pTable->name; type = TSDB_DATA_TYPE_BINARY; } else { - int16_t t1; - val = tdQueryTagByID(elem->pTable->tagVal, pInfo->sch.colId, &t1); - assert(pInfo->sch.type == t1); + val = tdGetKVRowValOfCol(elem->pTable->tagVal, pInfo->sch.colId); } //todo :the val is possible to be null, so check it out carefully diff --git a/src/util/inc/tcoding.h b/src/util/inc/tcoding.h index cc9caf71d0..e22c959a56 100644 --- a/src/util/inc/tcoding.h +++ b/src/util/inc/tcoding.h @@ -29,12 +29,33 @@ extern "C" { static const int32_t TNUMBER = 1; #define IS_LITTLE_ENDIAN() (*(uint8_t *)(&TNUMBER) != 0) -static FORCE_INLINE void *taosEncodeFixed8(void *buf, uint8_t value) { +#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode +#define ZIGZAGD(T, v) ((v) >> 1) ^ -((T)((v)&1)) // zigzag decode + +// ---- Fixed U8 +static FORCE_INLINE void *taosEncodeFixedU8(void *buf, uint8_t value) { ((uint8_t *)buf)[0] = value; return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) { +static FORCE_INLINE void *taosDecodeFixedU8(void *buf, uint8_t *value) { + *value = ((uint8_t *)buf)[0]; + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed I8 +static FORCE_INLINE void *taosEncodeFixedI8(void *buf, int8_t value) { + ((int8_t *)buf)[0] = value; + return POINTER_SHIFT(buf, sizeof(value)); +} + +static FORCE_INLINE void *taosDecodeFixedI8(void *buf, int8_t *value) { + *value = ((int8_t *)buf)[0]; + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed U16 +static FORCE_INLINE void *taosEncodeFixedU16(void *buf, uint16_t value) { if (IS_LITTLE_ENDIAN()) { memcpy(buf, &value, sizeof(value)); } else { @@ -45,7 +66,31 @@ static FORCE_INLINE void *taosEncodeFixed16(void *buf, uint16_t value) { return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosEncodeFixed32(void *buf, uint32_t value) { +static FORCE_INLINE void *taosDecodeFixedU16(void *buf, uint16_t *value) { + if (IS_LITTLE_ENDIAN()) { + memcpy(value, buf, sizeof(*value)); + } else { + ((uint8_t *)value)[1] = ((uint8_t *)buf)[0]; + ((uint8_t *)value)[0] = ((uint8_t *)buf)[1]; + } + + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed I16 +static FORCE_INLINE void *taosEncodeFixedI16(void *buf, int16_t value) { + return taosEncodeFixedU16(buf, ZIGZAGE(int16_t, value)); +} + +static FORCE_INLINE void *taosDecodeFixedI16(void *buf, int16_t *value) { + uint16_t tvalue = 0; + void * ret = taosDecodeFixedU16(buf, &tvalue); + *value = ZIGZAGD(int16_t, tvalue); + return ret; +} + +// ---- Fixed U32 +static FORCE_INLINE void *taosEncodeFixedU32(void *buf, uint32_t value) { if (IS_LITTLE_ENDIAN()) { memcpy(buf, &value, sizeof(value)); } else { @@ -58,7 +103,33 @@ static FORCE_INLINE void *taosEncodeFixed32(void *buf, uint32_t value) { return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) { +static FORCE_INLINE void *taosDecodeFixedU32(void *buf, uint32_t *value) { + if (IS_LITTLE_ENDIAN()) { + memcpy(value, buf, sizeof(*value)); + } else { + ((uint8_t *)value)[3] = ((uint8_t *)buf)[0]; + ((uint8_t *)value)[2] = ((uint8_t *)buf)[1]; + ((uint8_t *)value)[1] = ((uint8_t *)buf)[2]; + ((uint8_t *)value)[0] = ((uint8_t *)buf)[3]; + } + + return POINTER_SHIFT(buf, sizeof(*value)); +} + +// ---- Fixed I32 +static FORCE_INLINE void *taosEncodeFixedI32(void *buf, int32_t value) { + return taosEncodeFixedU32(buf, ZIGZAGE(int32_t, value)); +} + +static FORCE_INLINE void *taosDecodeFixedI32(void *buf, int32_t *value) { + uint32_t tvalue = 0; + void * ret = taosDecodeFixedU32(buf, &tvalue); + *value = ZIGZAGD(int32_t, tvalue); + return ret; +} + +// ---- Fixed U64 +static FORCE_INLINE void *taosEncodeFixedU64(void *buf, uint64_t value) { if (IS_LITTLE_ENDIAN()) { memcpy(buf, &value, sizeof(value)); } else { @@ -75,36 +146,7 @@ static FORCE_INLINE void *taosEncodeFixed64(void *buf, uint64_t value) { return POINTER_SHIFT(buf, sizeof(value)); } -static FORCE_INLINE void *taosDecodeFixed8(void *buf, uint8_t *value) { - *value = ((uint8_t *)buf)[0]; - return POINTER_SHIFT(buf, sizeof(*value)); -} - -static FORCE_INLINE void *taosDecodeFixed16(void *buf, uint16_t *value) { - if (IS_LITTLE_ENDIAN()) { - memcpy(value, buf, sizeof(*value)); - } else { - ((uint8_t *)value)[1] = ((uint8_t *)buf)[0]; - ((uint8_t *)value)[0] = ((uint8_t *)buf)[1]; - } - - return POINTER_SHIFT(buf, sizeof(*value)); -} - -static FORCE_INLINE void *taosDecodeFixed32(void *buf, uint32_t *value) { - if (IS_LITTLE_ENDIAN()) { - memcpy(value, buf, sizeof(*value)); - } else { - ((uint8_t *)value)[3] = ((uint8_t *)buf)[0]; - ((uint8_t *)value)[2] = ((uint8_t *)buf)[1]; - ((uint8_t *)value)[1] = ((uint8_t *)buf)[2]; - ((uint8_t *)value)[0] = ((uint8_t *)buf)[3]; - } - - return POINTER_SHIFT(buf, sizeof(*value)); -} - -static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) { +static FORCE_INLINE void *taosDecodeFixedU64(void *buf, uint64_t *value) { if (IS_LITTLE_ENDIAN()) { memcpy(value, buf, sizeof(*value)); } else { @@ -121,7 +163,20 @@ static FORCE_INLINE void *taosDecodeFixed64(void *buf, uint64_t *value) { return POINTER_SHIFT(buf, sizeof(*value)); } -static FORCE_INLINE void *taosEncodeVariant16(void *buf, uint16_t value) { +// ---- Fixed I64 +static FORCE_INLINE void *taosEncodeFixedI64(void *buf, int64_t value) { + return taosEncodeFixedU64(buf, ZIGZAGE(int64_t, value)); +} + +static FORCE_INLINE void *taosDecodeFixedI64(void *buf, int64_t *value) { + uint64_t tvalue = 0; + void * ret = taosDecodeFixedU64(buf, &tvalue); + *value = ZIGZAGD(int64_t, tvalue); + return ret; +} + +// ---- Variant U16 +static FORCE_INLINE void *taosEncodeVariantU16(void *buf, uint16_t value) { int i = 0; while (value >= ENCODE_LIMIT) { ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); @@ -132,39 +187,11 @@ static FORCE_INLINE void *taosEncodeVariant16(void *buf, uint16_t value) { ((uint8_t *)buf)[i] = value; - return POINTER_SHIFT(buf, i+1); -} - -static FORCE_INLINE void *taosEncodeVariant32(void *buf, uint32_t value) { - int i = 0; - while (value >= ENCODE_LIMIT) { - ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); - value >>= 7; - i++; - ASSERT(i < 5); - } - - ((uint8_t *)buf)[i] = value; - return POINTER_SHIFT(buf, i + 1); } -static FORCE_INLINE void *taosEncodeVariant64(void *buf, uint64_t value) { - int i = 0; - while (value >= ENCODE_LIMIT) { - ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); - value >>= 7; - i++; - ASSERT(i < 10); - } - - ((uint8_t *)buf)[i] = value; - - return POINTER_SHIFT(buf, i + 1); -} - -static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) { - int i = 0; +static FORCE_INLINE void *taosDecodeVariantU16(void *buf, uint16_t *value) { + int i = 0; uint16_t tval = 0; *value = 0; while (i < 3) { @@ -181,8 +208,35 @@ static FORCE_INLINE void *taosDecodeVariant16(void *buf, uint16_t *value) { return NULL; // error happened } -static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) { +// ---- Variant I16 +static FORCE_INLINE void *taosEncodeVariantI16(void *buf, int16_t value) { + return taosEncodeVariantU16(buf, ZIGZAGE(int16_t, value)); +} + +static FORCE_INLINE void *taosDecodeVariantI16(void *buf, int16_t *value) { + uint16_t tvalue = 0; + void * ret = taosDecodeVariantU16(buf, &tvalue); + *value = ZIGZAGD(int16_t, tvalue); + return ret; +} + +// ---- Variant U32 +static FORCE_INLINE void *taosEncodeVariantU32(void *buf, uint32_t value) { int i = 0; + while (value >= ENCODE_LIMIT) { + ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); + value >>= 7; + i++; + ASSERT(i < 5); + } + + ((uint8_t *)buf)[i] = value; + + return POINTER_SHIFT(buf, i + 1); +} + +static FORCE_INLINE void *taosDecodeVariantU32(void *buf, uint32_t *value) { + int i = 0; uint32_t tval = 0; *value = 0; while (i < 5) { @@ -199,8 +253,35 @@ static FORCE_INLINE void *taosDecodeVariant32(void *buf, uint32_t *value) { return NULL; // error happened } -static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) { +// ---- Variant I32 +static FORCE_INLINE void *taosEncodeVariantI32(void *buf, int32_t value) { + return taosEncodeVariantU32(buf, ZIGZAGE(int32_t, value)); +} + +static FORCE_INLINE void *taosDecodeVariantI32(void *buf, int32_t *value) { + uint32_t tvalue = 0; + void * ret = taosDecodeVariantU32(buf, &tvalue); + *value = ZIGZAGD(int32_t, tvalue); + return ret; +} + +// ---- Variant U64 +static FORCE_INLINE void *taosEncodeVariantU64(void *buf, uint64_t value) { int i = 0; + while (value >= ENCODE_LIMIT) { + ((uint8_t *)buf)[i] = (value | ENCODE_LIMIT); + value >>= 7; + i++; + ASSERT(i < 10); + } + + ((uint8_t *)buf)[i] = value; + + return POINTER_SHIFT(buf, i + 1); +} + +static FORCE_INLINE void *taosDecodeVariantU64(void *buf, uint64_t *value) { + int i = 0; uint64_t tval = 0; *value = 0; while (i < 10) { @@ -217,10 +298,23 @@ static FORCE_INLINE void *taosDecodeVariant64(void *buf, uint64_t *value) { return NULL; // error happened } +// ---- Variant I64 +static FORCE_INLINE void *taosEncodeVariantI64(void *buf, int64_t value) { + return taosEncodeVariantU64(buf, ZIGZAGE(int64_t, value)); +} + +static FORCE_INLINE void *taosDecodeVariantI64(void *buf, int64_t *value) { + uint64_t tvalue = 0; + void * ret = taosDecodeVariantU64(buf, &tvalue); + *value = ZIGZAGD(int64_t, tvalue); + return ret; +} + +// ---- string static FORCE_INLINE void *taosEncodeString(void *buf, char *value) { size_t size = strlen(value); - buf = taosEncodeVariant64(buf, size); + buf = taosEncodeVariantU64(buf, size); memcpy(buf, value, size); return POINTER_SHIFT(buf, size); @@ -229,7 +323,7 @@ static FORCE_INLINE void *taosEncodeString(void *buf, char *value) { static FORCE_INLINE void *taosDecodeString(void *buf, char **value) { uint64_t size = 0; - buf = taosDecodeVariant64(buf, &size); + buf = taosDecodeVariantU64(buf, &size); *value = (char *)malloc(size + 1); if (*value == NULL) return NULL; memcpy(*value, buf, size); diff --git a/src/util/tests/codingTests.cpp b/src/util/tests/codingTests.cpp index a72c7ef291..57e21a828c 100644 --- a/src/util/tests/codingTests.cpp +++ b/src/util/tests/codingTests.cpp @@ -9,8 +9,18 @@ static bool test_fixed_uint16(uint16_t value) { char buf[20] = "\0"; uint16_t value_check = 0; - void *ptr1 = taosEncodeFixed16(static_cast(buf), value); - void *ptr2 = taosDecodeFixed16(static_cast(buf), &value_check); + void *ptr1 = taosEncodeFixedU16(static_cast(buf), value); + void *ptr2 = taosDecodeFixedU16(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + +static bool test_fixed_int16(int16_t value) { + char buf[20] = "\0"; + int16_t value_check = 0; + + void *ptr1 = taosEncodeFixedI16(static_cast(buf), value); + void *ptr2 = taosDecodeFixedI16(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -19,8 +29,18 @@ static bool test_fixed_uint32(uint32_t value) { char buf[20] = "\0"; uint32_t value_check = 0; - void *ptr1 = taosEncodeFixed32(static_cast(buf), value); - void *ptr2 = taosDecodeFixed32(static_cast(buf), &value_check); + void *ptr1 = taosEncodeFixedU32(static_cast(buf), value); + void *ptr2 = taosDecodeFixedU32(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + +static bool test_fixed_int32(int32_t value) { + char buf[20] = "\0"; + int32_t value_check = 0; + + void *ptr1 = taosEncodeFixedI32(static_cast(buf), value); + void *ptr2 = taosDecodeFixedI32(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -29,8 +49,18 @@ static bool test_fixed_uint64(uint64_t value) { char buf[20] = "\0"; uint64_t value_check = 0; - void *ptr1 = taosEncodeFixed64(static_cast(buf), value); - void *ptr2 = taosDecodeFixed64(static_cast(buf), &value_check); + void *ptr1 = taosEncodeFixedU64(static_cast(buf), value); + void *ptr2 = taosDecodeFixedU64(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + +static bool test_fixed_int64(int64_t value) { + char buf[20] = "\0"; + int64_t value_check = 0; + + void *ptr1 = taosEncodeFixedI64(static_cast(buf), value); + void *ptr2 = taosDecodeFixedI64(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -39,8 +69,18 @@ static bool test_variant_uint16(uint16_t value) { char buf[20] = "\0"; uint16_t value_check = 0; - void *ptr1 = taosEncodeVariant16(static_cast(buf), value); - void *ptr2 = taosDecodeVariant16(static_cast(buf), &value_check); + void *ptr1 = taosEncodeVariantU16(static_cast(buf), value); + void *ptr2 = taosDecodeVariantU16(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + +static bool test_variant_int16(int16_t value) { + char buf[20] = "\0"; + int16_t value_check = 0; + + void *ptr1 = taosEncodeVariantI16(static_cast(buf), value); + void *ptr2 = taosDecodeVariantI16(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -49,8 +89,18 @@ static bool test_variant_uint32(uint32_t value) { char buf[20] = "\0"; uint32_t value_check = 0; - void *ptr1 = taosEncodeVariant32(static_cast(buf), value); - void *ptr2 = taosDecodeVariant32(static_cast(buf), &value_check); + void *ptr1 = taosEncodeVariantU32(static_cast(buf), value); + void *ptr2 = taosDecodeVariantU32(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + +static bool test_variant_int32(int32_t value) { + char buf[20] = "\0"; + int32_t value_check = 0; + + void *ptr1 = taosEncodeVariantI32(static_cast(buf), value); + void *ptr2 = taosDecodeVariantI32(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -59,8 +109,18 @@ static bool test_variant_uint64(uint64_t value) { char buf[20] = "\0"; uint64_t value_check = 0; - void *ptr1 = taosEncodeVariant64(static_cast(buf), value); - void *ptr2 = taosDecodeVariant64(static_cast(buf), &value_check); + void *ptr1 = taosEncodeVariantU64(static_cast(buf), value); + void *ptr2 = taosDecodeVariantU64(static_cast(buf), &value_check); + + return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); +} + +static bool test_variant_int64(int64_t value) { + char buf[20] = "\0"; + int64_t value_check = 0; + + void *ptr1 = taosEncodeVariantI64(static_cast(buf), value); + void *ptr2 = taosDecodeVariantI64(static_cast(buf), &value_check); return ((ptr2 != NULL) && (value == value_check) && (ptr1 == ptr2)); } @@ -68,49 +128,111 @@ static bool test_variant_uint64(uint64_t value) { TEST(codingTest, fixed_encode_decode) { srand(time(0)); + // uint16_t for (uint16_t value = 0; value <= UINT16_MAX; value++) { ASSERT_TRUE(test_fixed_uint16(value)); if (value == UINT16_MAX) break; } - ASSERT_TRUE(test_fixed_uint32(0)); - ASSERT_TRUE(test_fixed_uint32(UINT32_MAX)); - - for (int i = 0; i < 1000000; i++) { - ASSERT_TRUE(test_fixed_uint32(rand())); + // int16_t + for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) { + ASSERT_TRUE(test_fixed_int16(value)); + if (value == INT16_MAX) break; } - std::mt19937_64 gen (std::random_device{}()); + std::mt19937 gen32(std::random_device{}()); + // uint32_t + ASSERT_TRUE(test_fixed_uint32(0)); + ASSERT_TRUE(test_fixed_uint32(UINT32_MAX)); + std::uniform_int_distribution distr1(0, UINT32_MAX); + + for (int i = 0; i < 1000000; i++) { + ASSERT_TRUE(test_fixed_uint32(distr1(gen32))); + } + + // int32_t + ASSERT_TRUE(test_fixed_int32(INT32_MIN)); + ASSERT_TRUE(test_fixed_int32(INT32_MAX)); + std::uniform_int_distribution distr2(INT32_MIN, INT32_MAX); + + for (int i = 0; i < 1000000; i++) { + ASSERT_TRUE(test_fixed_int32(distr2(gen32))); + } + + std::mt19937_64 gen64(std::random_device{}()); + // uint64_t + std::uniform_int_distribution distr3(0, UINT64_MAX); ASSERT_TRUE(test_fixed_uint64(0)); ASSERT_TRUE(test_fixed_uint64(UINT64_MAX)); for (int i = 0; i < 1000000; i++) { - ASSERT_TRUE(test_fixed_uint64(gen())); + ASSERT_TRUE(test_fixed_uint64(distr3(gen64))); + } + + // int64_t + std::uniform_int_distribution distr4(INT64_MIN, INT64_MAX); + + ASSERT_TRUE(test_fixed_int64(INT64_MIN)); + ASSERT_TRUE(test_fixed_int64(INT64_MAX)); + for (int i = 0; i < 1000000; i++) { + ASSERT_TRUE(test_fixed_int64(distr4(gen64))); } } TEST(codingTest, variant_encode_decode) { srand(time(0)); + // uint16_t for (uint16_t value = 0; value <= UINT16_MAX; value++) { ASSERT_TRUE(test_variant_uint16(value)); if (value == UINT16_MAX) break; } + // int16_t + for (int16_t value = INT16_MIN; value <= INT16_MAX; value++) { + ASSERT_TRUE(test_variant_int16(value)); + if (value == INT16_MAX) break; + } + + std::mt19937 gen32(std::random_device{}()); + // uint32_t + std::uniform_int_distribution distr1(0, UINT32_MAX); ASSERT_TRUE(test_variant_uint32(0)); ASSERT_TRUE(test_variant_uint32(UINT32_MAX)); for (int i = 0; i < 5000000; i++) { - ASSERT_TRUE(test_variant_uint32(rand())); + ASSERT_TRUE(test_variant_uint32(distr1(gen32))); } - std::mt19937_64 gen (std::random_device{}()); + // int32_t + std::uniform_int_distribution distr2(INT32_MIN, INT32_MAX); + ASSERT_TRUE(test_variant_int32(INT32_MIN)); + ASSERT_TRUE(test_variant_int32(INT32_MAX)); + + for (int i = 0; i < 5000000; i++) { + ASSERT_TRUE(test_variant_int32(distr2(gen32))); + } + + std::mt19937_64 gen64(std::random_device{}()); + // uint64_t + std::uniform_int_distribution distr3(0, UINT64_MAX); ASSERT_TRUE(test_variant_uint64(0)); ASSERT_TRUE(test_variant_uint64(UINT64_MAX)); for (int i = 0; i < 5000000; i++) { - uint64_t value = gen(); + // uint64_t value = gen(); // printf("%ull\n", value); - ASSERT_TRUE(test_variant_uint64(value)); + ASSERT_TRUE(test_variant_uint64(distr3(gen64))); + } + + // int64_t + std::uniform_int_distribution distr4(INT64_MIN, INT64_MAX); + + ASSERT_TRUE(test_variant_int64(INT64_MIN)); + ASSERT_TRUE(test_variant_int64(INT64_MAX)); + for (int i = 0; i < 5000000; i++) { + // uint64_t value = gen(); + // printf("%ull\n", value); + ASSERT_TRUE(test_variant_int64(distr4(gen64))); } } \ No newline at end of file diff --git a/src/vnode/src/vnodeWrite.c b/src/vnode/src/vnodeWrite.c index 584aa1bf2f..90e2a482e9 100644 --- a/src/vnode/src/vnodeWrite.c +++ b/src/vnode/src/vnodeWrite.c @@ -104,65 +104,14 @@ static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pR } static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { - SMDCreateTableMsg *pTable = pCont; - int32_t code = 0; - - vTrace("vgId:%d, table:%s, start to create", pVnode->vgId, pTable->tableId); - int16_t numOfColumns = htons(pTable->numOfColumns); - int16_t numOfTags = htons(pTable->numOfTags); - int32_t sid = htonl(pTable->sid); - uint64_t uid = htobe64(pTable->uid); - SSchema * pSchema = (SSchema *)pTable->data; - STSchema *pDestTagSchema = NULL; - SDataRow dataRow = NULL; - int32_t totalCols = numOfColumns + numOfTags; - - STableCfg tCfg; - tsdbInitTableCfg(&tCfg, pTable->tableType, uid, sid); + STableCfg *pCfg = tsdbCreateTableCfgFromMsg((SMDCreateTableMsg *)pCont); + if (pCfg == NULL) return terrno; + int32_t code = tsdbCreateTable(pVnode->tsdb, pCfg); - STSchema *pDestSchema = tdNewSchema(numOfColumns); - for (int i = 0; i < numOfColumns; i++) { - tdSchemaAddCol(pDestSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); - } - tsdbTableSetSchema(&tCfg, pDestSchema, false); - tsdbTableSetName(&tCfg, pTable->tableId, false); - - if (numOfTags != 0) { - pDestTagSchema = tdNewSchema(numOfTags); - for (int i = numOfColumns; i < totalCols; i++) { - tdSchemaAddCol(pDestTagSchema, pSchema[i].type, htons(pSchema[i].colId), htons(pSchema[i].bytes)); - } - tsdbTableSetTagSchema(&tCfg, pDestTagSchema, false); - tsdbTableSetSName(&tCfg, pTable->superTableId, false); - tsdbTableSetSuperUid(&tCfg, htobe64(pTable->superTableUid)); - - char *pTagData = pTable->data + totalCols * sizeof(SSchema); - int accumBytes = 0; - dataRow = tdNewTagRowFromSchema(pDestTagSchema, numOfTags); - - for (int i = 0; i < numOfTags; i++) { - STColumn *pTCol = schemaColAt(pDestTagSchema, i); - tdAppendTagColVal(dataRow, pTagData + accumBytes, pTCol->type, pTCol->bytes, pTCol->colId); - accumBytes += htons(pSchema[i + numOfColumns].bytes); - } - tsdbTableSetTagValue(&tCfg, dataRow, false); - } - - // only normal has sql string - if (pTable->tableType == TSDB_STREAM_TABLE) { - char *sql = pTable->data + totalCols * sizeof(SSchema); - vTrace("vgId:%d, table:%s is creating, sql:%s", pVnode->vgId, pTable->tableId, sql); - tsdbTableSetStreamSql(&tCfg, sql, false); - } - - code = tsdbCreateTable(pVnode->tsdb, &tCfg); - tdFreeDataRow(dataRow); - tfree(pDestTagSchema); - tfree(pDestSchema); - - vTrace("vgId:%d, table:%s is created, result:%x", pVnode->vgId, pTable->tableId, code); - return code; + tsdbClearTableCfg(pCfg); + free(pCfg); + return code; } static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {