diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index 83c15cdbdb..08b3971b3a 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -24,6 +24,9 @@ extern "C" { #endif +// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap. +#define TD_SUPPORT_BITMAP + #define STR_TO_VARSTR(x, str) \ do { \ VarDataLenT __len = (VarDataLenT)strlen(str); \ @@ -118,6 +121,10 @@ typedef struct { STColumn *columns; } STSchemaBuilder; +#define TD_VTYPE_BITS 2 // val type +#define TD_VTYPE_PARTS 4 // 8 bits / TD_VTYPE_BITS = 4 +#define TD_VTYPE_OPTR 3 // TD_VTYPE_PARTS - 1, utilize to get remainder + int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder); void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version); @@ -125,6 +132,8 @@ int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder); // ----------------- Semantic timestamp key definition +#ifdef TD_2 .0 + typedef uint64_t TKEY; #define TKEY_INVALID UINT64_MAX @@ -144,6 +153,29 @@ typedef uint64_t TKEY; #define TD_TO_TKEY(key) tdGetTKEY(((key) < MIN_TS_KEY) ? MIN_TS_KEY : (((key) > MAX_TS_KEY) ? MAX_TS_KEY : key)) +#else + +typedef uint64_t TKEY; + +#define TKEY_INVALID UINT64_MAX +#define TKEY_NULL TKEY_INVALID +#define TKEY_NEGATIVE_FLAG (((TKEY)1) << 63) +#define TKEY_DELETE_FLAG (((TKEY)1) << 62) +#define TKEY_VALUE_FILTER (~(TKEY_NEGATIVE_FLAG | TKEY_DELETE_FLAG)) + +#define TKEY_IS_NEGATIVE(tkey) (((tkey)&TKEY_NEGATIVE_FLAG) != 0) +#define TKEY_IS_DELETED(tkey) (((tkey)&TKEY_DELETE_FLAG) != 0) +#define tdSetTKEYDeleted(tkey) ((tkey) | TKEY_DELETE_FLAG) +#define tdGetTKEY(key) (((TKEY)ABS(key)) | (TKEY_NEGATIVE_FLAG & (TKEY)(key))) +#define tdGetKey(tkey) (((TSKEY)((tkey)&TKEY_VALUE_FILTER)) * (TKEY_IS_NEGATIVE(tkey) ? -1 : 1)) + +#define MIN_TS_KEY ((TSKEY)0x8000000000000001) +#define MAX_TS_KEY ((TSKEY)0x7fffffffffffffff) + +#define TD_TO_TKEY(key) tdGetTKEY(((key) < MIN_TS_KEY) ? MIN_TS_KEY : (((key) > MAX_TS_KEY) ? MAX_TS_KEY : key)) + +#endif + static FORCE_INLINE TKEY keyToTkey(TSKEY key) { TSKEY lkey = key; if (key > MAX_TS_KEY) { @@ -718,31 +750,6 @@ static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value return len; } -/** - * 1. calculate the delta of AllNullLen for SDataRow. - * 2. calculate the real len for SKVRow. - */ -static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colType, int32_t *dataLen, int32_t *kvLen) { - switch (colType) { - case TSDB_DATA_TYPE_BINARY: { - int32_t varLen = varDataLen(value); - *dataLen += (varLen - CHAR_BYTES); - *kvLen += (varLen + sizeof(SColIdx)); - break; - } - case TSDB_DATA_TYPE_NCHAR: { - int32_t varLen = varDataLen(value); - *dataLen += (varLen - TSDB_NCHAR_SIZE); - *kvLen += (varLen + sizeof(SColIdx)); - break; - } - default: { - *kvLen += (TYPE_BYTES[colType] + sizeof(SColIdx)); - break; - } - } -} - typedef struct { int16_t colId; uint8_t colType; @@ -757,54 +764,6 @@ static FORCE_INLINE void setSColInfo(SColInfo *colInfo, int16_t colId, uint8_t c SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2); -#if 0 -// ----------------- Raw payload structure for row: -/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->| - * | |<----------------- flen ------------->|<--- value part --->| - * |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... | - * +-----------+----------+----------+--------------------------------------|--------------------| - * | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... | - * +-----------+----------+----------+--------------------------------------+--------------------| - * 1. offset in column data tuple starts from the value part in case of uint16_t overflow. - * 2. dataTLen: total length including the header and body. - */ - -#define PAYLOAD_NCOLS_LEN sizeof(uint16_t) -#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT)) -#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN) -#define PAYLOAD_ID_LEN sizeof(int16_t) -#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t)) -#define PAYLOAD_COL_HEAD_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(uint16_t)) -#define PAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY)) - -#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN) -#define payloadType(r) (*(uint8_t *)(r)) -#define payloadSetType(r, t) (payloadType(r) = (t)) -#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header -#define payloadSetTLen(r, l) (payloadTLen(r) = (l)) -#define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET)) -#define payloadSetNCols(r, n) (payloadNCols(r) = (n)) -#define payloadValuesOffset(r) \ - (PAYLOAD_HEADER_LEN + payloadNCols(r) * PAYLOAD_COL_HEAD_LEN) // avoid using the macro in loop -#define payloadValues(r) POINTER_SHIFT(r, payloadValuesOffset(r)) // avoid using the macro in loop -#define payloadColId(c) (*(int16_t *)(c)) -#define payloadColType(c) (*(uint8_t *)POINTER_SHIFT(c, PAYLOAD_ID_LEN)) -#define payloadColOffset(c) (*(uint16_t *)POINTER_SHIFT(c, PAYLOAD_ID_TYPE_LEN)) -#define payloadColValue(c) POINTER_SHIFT(c, payloadColOffset(c)) - -#define payloadColSetId(c, i) (payloadColId(c) = (i)) -#define payloadColSetType(c, t) (payloadColType(c) = (t)) -#define payloadColSetOffset(c, o) (payloadColOffset(c) = (o)) - -#define payloadTSKey(r) (*(TSKEY *)POINTER_SHIFT(r, payloadValuesOffset(r))) -#define payloadTKey(r) (*(TKEY *)POINTER_SHIFT(r, payloadValuesOffset(r))) -#define payloadKey(r) tdGetKey(payloadTKey(r)) - - -static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); } - -#endif - #ifdef __cplusplus } #endif diff --git a/include/common/trow.h b/include/common/trow.h index f562521dad..8137117510 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -17,101 +17,64 @@ #define _TD_COMMON_ROW_H_ #include "os.h" +#include "tdef.h" +#include "taoserror.h" +#include "talgo.h" #include "tbuffer.h" #include "tdataformat.h" #include "tdef.h" #include "tschema.h" +#include "ttypes.h" #ifdef __cplusplus extern "C" { #endif -#define TD_SUPPORT_NONE_VAL +// Target of trow.h: +// 1. Row related definition in dataformat.h of 2.0 could be replaced with trow.h of 3.0. +// 2. The basic definition in dataformat.h is shared with trow.h of 3.0. // row type -#define TD_ROW_TP 0 // default -#define TD_ROW_KV 1 +#define TD_ROW_TP 0x0 // default +#define TD_ROW_KV 0x01 // val type #define TD_VTYPE_NORM 0x0 // normal val: not none, not null #define TD_VTYPE_NONE 0x01 // none or unknown/undefined #define TD_VTYPE_NULL 0x02 // null val -#ifdef TD_SUPPORT_NONE_VAL +#define isSelectKVRow(klen, tlen) ((klen) < (tlen)) + +#ifdef TD_SUPPORT_BITMAP static FORCE_INLINE bool tdValTypeIsNorm(int8_t valType) { return (valType & TD_VTYPE_NORM); } static FORCE_INLINE bool tdValTypeIsNone(int8_t valType) { return (valType & TD_VTYPE_NONE); } static FORCE_INLINE bool tdValTypeIsNull(int8_t valType) { return (valType & TD_VTYPE_NULL); } -#else - #endif -static FORCE_INLINE bool tdValIsNorm(int8_t valType, const void *val, int32_t type) { -#ifdef TD_SUPPORT_NONE_VAL +static FORCE_INLINE bool tdValIsNorm(int8_t valType, const void *val, int32_t colType) { +#ifdef TD_SUPPORT_BITMAP return tdValTypeIsNorm(valType); #else - return !isNull(val, type); + return !isNull(val, colType); #endif } + static FORCE_INLINE bool tdValIsNone(int8_t valType) { -#ifdef TD_SUPPORT_NONE_VAL +#ifdef TD_SUPPORT_BITMAP return tdValTypeIsNone(valType); #else return false; #endif } -static FORCE_INLINE bool tdValIsNull(int8_t valType, const void *val, int32_t type) { -#ifdef TD_SUPPORT_NONE_VAL + +static FORCE_INLINE bool tdValIsNull(int8_t valType, const void *val, int32_t colType) { +#ifdef TD_SUPPORT_BITMAP return tdValTypeIsNull(valType); #else - return isNull(val, type); + return isNull(val, colType); #endif } -#define TD_ROW_LEN(r) -#define TD_ROW_TLEN(r) -#define TD_ROW_TYPE(r) -#define TD_ROW_BODY(r) -#define TD_ROW_TKEY(r) -#define TD_ROW_KEY(r) -#define TD_ROW_DELETED(r) -#define TD_ROW_VERSION(r) -#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) - -#define TD_ROW_SET_LEN(r, l) -#define TD_ROW_SET_VERSION(r, v) -#define TD_ROW_CPY(dst, r) - -// ----------------- SRow appended with tuple row structure(STpRow) -/* - * |---------|------------------------------------------------- len -------------------------------------->| - * |<-------- Head ------>|<--------- flen ------------->|<-- blen -->| | - * |---------+---------------------+---------------------------------+-------------+-----------------------+ - * | uint8_t | uint32_t | int16_t | | | | - * |---------+----------+----------+---------------------------------+-------------------------------------+ - * | flag | len | sversion |(key) first part | bitmap | second part | - * +---------+----------+----------+---------------------------------+-------------------------------------+ - * - * NOTE: Timestamp in this row structure is TKEY instead of TSKEY - * Use 2 bits in bitmap for each column - * flag: - * 0: flag&0x01 0 STpRow, 1 SKvRow // since 2.0 - * 1: flag&0x02 0 without bitmap, 1 with bitmap. // 如果None值支持数据库或者更小维度,则需要指定一个bit区分。TODO - * 2: endian(0 big endian, 1 little endian) - * 3-7: reserved(value 0) - */ - -// ----------------- SRow appended with K-V row structure(SKvRow) -/* |--------------------|------------------------------------------------ len --------------------------->| - * |<-------- Head ---->|<--------- colsIdxLen ---------->|<-- blen -->| | - * |--------------------+----------+-----------------------------------------------------------------------+ - * | uint8_t | uint32_t | int16_t | | | | - * |---------+----------+----------+-----------------------------------------------------------------------+ - * | flag | len | ncols |(keyColId) cols index | bitmap | data part | - * |---------+----------+----------+---------------------------------+-------------------------------------+ - * - * NOTE: Timestamp in this row structure is TKEY instead of TSKEY - */ - typedef void *SRow; typedef struct { @@ -140,32 +103,385 @@ typedef struct { struct { /// row type uint32_t type : 2; + /// is delete row(0 not delete, 1 delete) + uint32_t del : 1; + /// endian(0 little endian, 1 big endian) + uint32_t endian : 1; + /// reserved for back compatibility + uint32_t reserve : 12; /// row schema version uint32_t sver : 16; - /// is delete row - uint32_t del : 1; - /// reserved for back compatibility - uint32_t reserve : 13; }; }; /// row total length uint32_t len; + /// nCols of SRow(only valid for K-V row) + uint64_t ncols : 16; /// row version - uint64_t ver; + uint64_t ver : 48; /// timestamp TSKEY ts; /// the inline data, maybe a tuple or a k-v tuple char data[]; } STSRow; +#define TD_ROW_HEAD_LEN (sizeof(STSRow)) + +#define TD_ROW_TYPE(r) ((r)->type) +#define TD_ROW_DELETE(r) ((r)->del) +#define TD_ROW_ENDIAN(r) ((r)->endian) +#define TD_ROW_SVER(r) ((r)->sver) +#define TD_ROW_NCOLS(r) ((r)->ncols) +#define TD_ROW_DATA(r) ((r)->data) +#define TD_ROW_LEN(r) ((r)->len) +#define TD_ROW_TSKEY(r) ((r)->ts) + +// N.B. If without STSchema, getExtendedRowSize() is used to get the rowMaxBytes and (int)ceil((double)nCols/TD_VTYPE_PARTS) +// should be added if TD_SUPPORT_BITMAP defined. +#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) (schemaTLen(s) + TD_ROW_HEAD_LEN) + +#define TD_ROW_SET_TYPE(r, t) (TD_ROW_TYPE(r) = (t)) +#define TD_ROW_SET_DELETE(r) (TD_ROW_DELETE(r) = 1) +#define TD_ROW_SET_SVER(r, v) (TD_ROW_SVER(r) = (v)) +#define TD_ROW_SET_LEN(r, l) (TD_ROW_LEN(r) = (l)) +#define TD_ROW_CPY(dst, r) memcpy((dst), (r), TD_ROW_LEN(r)) + +#define TD_ROW_IS_DELETED(r) (TD_ROW_DELETE(r)) +#define TD_IS_TP_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_TP) +#define TD_IS_KV_ROW(r) (TD_ROW_TYPE(r) == TD_ROW_KV) +#define TD_BOOL_STR(b) ((b) ? "true" : "false") +#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert)) + +#define TD_ROW_OFFSET(p) ((p)->toffset); + +#ifdef TD_SUPPORT_BITMAP +static FORCE_INLINE void *tdRowBitmap(STSRow *pRow, uint16_t flen) { + switch (pRow->type) { + case TD_ROW_TP: + return POINTER_SHIFT(pRow->data, flen); + case TD_ROW_KV: + return POINTER_SHIFT(pRow->data, pRow->ncols * sizeof(SKvRowIdx)); + default: + break; + } + return NULL; +} +#endif + +STpRow tdNewTpRowFromSchema(STSchema *pSchema); +void tdFreeTpRow(STpRow row); +void tdInitTpRow(STpRow row, STSchema *pSchema); +STpRow tdTpRowDup(STpRow row); + +// ----------------- Tuple row structure(STpRow) +/* + * |<----------------------------- tlen ---------------------------------->| + * |<--------- flen ------------->|<-- blen -->| | + * +---------------------------------+-------------+-----------------------+ + * | | | | + * +---------------------------------+-------------------------------------+ + * | first part | bitmap | second part | + * +---------------------------------+-------------+-----------------------+ + * + */ + +// ----------------- K-V row structure(SKvRow) +/* + * |<--------- colsIdxLen ---------->|<-- blen -->| | + * +---------------------------------+------------+------------------------+ + * | | | | + * +-----------------------------------------------------------------------+ + * | cols index | bitmap | data part | + * +---------------------------------+------------+------------------------+ + * + */ +typedef struct { + // necessary info + int8_t rowType; + int16_t sver; + STSRow *pBuf; + + // auxiliary info + int32_t flen; + int16_t nBoundCols; + int16_t nCols; + int16_t nBitmaps; + int16_t nBoundBitmaps; + int32_t offset; + void * pBitmap; + void * pOffset; +} SRowBuilder; + +/** + * @brief + * + * @param pBuilder + * @param sversion schema version + * @return int32_t + */ +int32_t tdSRowInit(SRowBuilder *pBuilder, int16_t sversion) { + pBuilder->rowType = TD_ROW_TP; + pBuilder->sver = sversion; + return TSDB_CODE_SUCCESS; +} + +/** + * @brief 一般不建议使用,除非特殊情况 + * + * @param pBuilder + * @param rowType + * @return FORCE_INLINE + */ +static FORCE_INLINE void tdSRowSetRowType(SRowBuilder *pBuilder, int8_t rowType) { pBuilder->rowType = rowType; } + +/** + * @brief 用于判定采用STpRow/SKvRow, + * + * @param pBuilder + * @param allNullLen 无法获取则填写-1 + * @param boundNullLen 无法获取则填写-1 + * @param nCols + * @param nBoundCols + * @param flen + * @return FORCE_INLINE + */ +static FORCE_INLINE int32_t tdSRowSetExtendedInfo(SRowBuilder *pBuilder, int32_t allNullLen, int32_t boundNullLen, + int32_t nCols, int32_t nBoundCols, int32_t flen) { + if ((boundNullLen > 0) && (allNullLen > 0) && isSelectKVRow(boundNullLen, allNullLen)) { + pBuilder->rowType = TD_ROW_KV; + } + pBuilder->nBoundCols = nBoundCols; + pBuilder->nCols = nCols; + pBuilder->flen = flen; + if (pBuilder->flen <= 0 || pBuilder->nCols <= 0 || pBuilder->nBoundCols <= 0) { + terrno = TSDB_CODE_INVALID_PARA; + return TSDB_CODE_INVALID_PARA; + } +#ifdef TD_SUPPORT_BITMAP + pBuilder->nBitmaps = (int16_t)ceil((double)pBuilder->nCols / TD_VTYPE_PARTS); + pBuilder->nBoundBitmaps = (int16_t)ceil((double)pBuilder->nBoundCols / TD_VTYPE_PARTS); +#else + pBuilder->nBitmaps = 0; + pBuilder->nBoundBitmaps = 0; +#endif + return TSDB_CODE_SUCCESS; +} + +/** + * @brief 在pBuf位置生成SRow + * + * @param pBuilder + * @param pBuf + */ +int32_t tdSRowResetBuf(SRowBuilder *pBuilder, void *pBuf) { + pBuilder->pBuf = pBuf; + if (!pBuilder->pBuf) { + terrno = TSDB_CODE_INVALID_PARA; + return TSDB_CODE_INVALID_PARA; + } + switch (pBuilder->rowType) { + case TD_ROW_TP: +#ifdef TD_SUPPORT_BITMAP + pBuilder->pBitmap = POINTER_SHIFT(pBuilder->pBuf, pBuilder->flen); +#endif + uint32_t len = TD_ROW_HEAD_LEN + pBuilder->flen + pBuilder->nBitmaps; + TD_ROW_SET_LEN(pBuilder->pBuf, len); + TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver); + break; + case TD_ROW_KV: + uint32_t len = pBuilder->nBoundCols * sizeof(SKvRowIdx); +#ifdef TD_SUPPORT_BITMAP + pBuilder->pBitmap = POINTER_SHIFT(pBuilder->pBuf, len); +#endif + len += (TD_ROW_HEAD_LEN + pBuilder->nBoundBitmaps); + TD_ROW_SET_LEN(pBuilder->pBuf, len); + TD_ROW_SET_SVER(pBuilder->pBuf, pBuilder->sver); + break; + default: + terrno = TSDB_CODE_INVALID_PARA; + return TSDB_CODE_INVALID_PARA; + } + return TSDB_CODE_SUCCESS; +} + +/** + * @brief 由调用方管理存储空间的分配及释放,一次输入多个参数 + * + * @param pBuilder + * @param pBuf + * @param allNullLen + * @param boundNullLen + * @param nCols + * @param nBoundCols + * @param flen + * @return FORCE_INLINE + */ +static FORCE_INLINE int32_t tdSRowInitEx(SRowBuilder *pBuilder, void *pBuf, uint32_t allNullLen, uint32_t boundNullLen, + int32_t nCols, int32_t nBoundCols, int32_t flen) { + + tdSRowSetExtendedInfo(pBuilder, allNullLen, boundNullLen, nCols, nBoundCols, flen); + return tdSRowResetBuf(pBuilder, pBuf); +} + +/** + * @brief + * + * @param pBuilder + */ +void tdSRowReset(SRowBuilder *pBuilder) { + pBuilder->rowType = TD_ROW_TP; + pBuilder->pBuf = NULL; + pBuilder->nBoundCols = -1; + pBuilder->nCols = -1; + pBuilder->flen = -1; + pBuilder->pBitmap = NULL; +} + +/** + * @brief + * + * @param pBuilder + * @param colId start from PRIMARYKEY_TIMESTAMP_COL_ID + * @param colType + * @param val + * @param valType + * @param offset + * @param idx sorted column index, start from 0 + * @return FORCE_INLINE + */ +static FORCE_INLINE int32_t tdAppendColValToRow(SRowBuilder *pBuilder, int16_t colId, int8_t colType, const void *val, + int8_t valType, int32_t offset, int16_t idx) { + STSRow *pRow = pBuilder->pBuf; + void * pBitmap = NULL; + if (!val) { +#ifdef TD_SUPPORT_BITMAP + if (tdValIsNorm(valType, val, colType)) { + terrno = TSDB_CODE_INVALID_PTR; + return terrno; + } +#else + terrno = TSDB_CODE_INVALID_PTR; + return terrno; +#endif + } + // TS KEY is stored in STSRow.ts and not included in STSRow.data field. + if (colId == PRIMARYKEY_TIMESTAMP_COL_ID) { + TD_ROW_TSKEY(pRow) = *(TSKEY *)val; +#ifdef TD_SUPPORT_BITMAP + pBitmap = tdRowBitmap(pRow, pBuilder->flen); + if (tdSetBitmap(pBitmap, idx, valType) != TSDB_CODE_SUCCESS) { + return terrno; + } +#endif + return TSDB_CODE_SUCCESS; + } + // TODO: We can avoid the type judegement by FP, but would prevent the inline scheme. + // typedef int (*tdAppendColValToSRowFp)(STSRow *pRow, void *pBitmap, int16_t colId, int8_t colType, + // const void *val, int8_t valType, int32_t tOffset, int16_t tIdx); + if (TD_IS_TP_ROW(pRow)) { + tdAppendColValToTpRow(pRow->data, pBitmap, val, true, colType, valType, idx, offset); + } else { + tdAppendColValToKvRow(pRow->data, pBitmap, val, true, colType, valType, idx, offset, colId); + } + return TSDB_CODE_SUCCESS; +} + +static FORCE_INLINE int tdAppendColValToTpRow(STSRow *row, void *pBitmap, const void *val, bool isCopyVarData, + int8_t colType, int8_t valType, int16_t idx, int32_t offset) { + ASSERT(offset >= sizeof(TSDB_DATA_TYPE_TIMESTAMP)); + if (!tdValIsNone(valType)) { + if (IS_VAR_DATA_TYPE(colType)) { + // ts key stored in STSRow.ts + *(VarDataOffsetT *)POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP)) = TD_ROW_LEN(row); + if (isCopyVarData) { + memcpy(POINTER_SHIFT(row, TD_ROW_LEN(row)), val, varDataTLen(val)); + } + TD_ROW_LEN(row) += varDataTLen(val); + } else { + memcpy(POINTER_SHIFT(row->data, offset - sizeof(TSDB_DATA_TYPE_TIMESTAMP)), val, TYPE_BYTES[colType]); + } + } + +#ifdef TD_SUPPORT_BITMAP + tdSetBitmap(pBitmap, idx, valType); +#endif + + return 0; +} + +static FORCE_INLINE int tdAppendColValToKvRow(STSRow *row, void *pBitmap, const void *val, bool isCopyValData, + int8_t colType, int8_t valType, int16_t idx, int32_t offset, + int16_t colId) { + ASSERT(offset >= sizeof(SKvRowIdx)); + + if (!tdValIsNone(valType)) { + // ts key stored in STSRow.ts + SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row->data, offset - sizeof(SKvRowIdx)); + char * ptr = (char *)POINTER_SHIFT(row, TD_ROW_LEN(row)); + pColIdx->colId = colId; + pColIdx->offset = TD_ROW_LEN(row); // the offset include the TD_ROW_HEAD_LEN + + if (IS_VAR_DATA_TYPE(colType)) { + if (isCopyValData) { + memcpy(ptr, val, varDataTLen(val)); + } + TD_ROW_LEN(row) += varDataTLen(val); + } else { + memcpy(ptr, val, TYPE_BYTES[colType]); + TD_ROW_LEN(row) += TYPE_BYTES[colType]; + } + } + +#ifdef TD_SUPPORT_BITMAP + tdSetBitmap(pBitmap, idx, valType); +#endif + + return 0; +} + +static FORCE_INLINE int32_t tdSetBitmap(void *pBitmap, int16_t tIdx, int8_t valType) { + if (!pBitmap) { + terrno = TSDB_CODE_INVALID_PTR; + return terrno; + } + int16_t nBytes = tIdx / TD_VTYPE_PARTS; + int16_t nOffset = tIdx & TD_VTYPE_OPTR; + char * pDestByte = (char *)POINTER_SHIFT(pBitmap, nBytes); + switch (nOffset) { + case 0: + *pDestByte = ((*pDestByte) & 0x3F) | (valType << 6); + break; + case 1: + *pDestByte = ((*pDestByte) & 0xCF) | (valType << 4); + break; + case 2: + *pDestByte = ((*pDestByte) & 0xF3) | (valType << 2); + break; + case 3: + *pDestByte = ((*pDestByte) & 0xFC) | valType; + break; + default: + terrno = TSDB_CODE_INVALID_PARA; + return terrno; + } + return TSDB_CODE_SUCCESS; +} + +#ifdef TROW_ORIGIN_HZ typedef struct { uint32_t nRows; char rows[]; } STSRowBatch; +static void tdSRowPrint(STSRow *row) { + printf("type:%d, del:%d, sver:%d\n", row->type, row->del, row->sver); + printf("isDeleted:%s, isTpRow:%s, isKvRow:%s\n", TD_BOOL_STR(TD_ROW_IS_DELETED(row)), TD_BOOL_STR(TD_IS_TP_ROW(row)), + TD_BOOL_STR(TD_IS_KV_ROW(row))); +} + typedef enum { - /// ordinary row builder - TD_OR_ROW_BUILDER = 0, + /// tuple row builder + TD_TP_ROW_BUILDER = 0, /// kv row builder TD_KV_ROW_BUILDER, /// self-determined row builder @@ -208,6 +524,7 @@ int tRowReaderRead(STSRowReader *pRowReader, col_id_t cid, void *target, uint64_ #define tRowBatchIterInit(pRB) \ { .it = 0, .pRowBatch = (pRB) } const STSRow *tRowBatchIterNext(STSRowBatchIter *pRowBatchIter); +#endif #ifdef __cplusplus } diff --git a/include/util/types.h b/include/util/types.h index dca8cd9a91..cb25448cc4 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -72,7 +72,7 @@ static FORCE_INLINE double taos_align_get_double(const char *pBuf) { #define SET_DOUBLE_PTR(x, y) { (*(double *)(x)) = (*(double *)(y)); } // #endif -typedef int16_t VarDataLenT; // maxVarDataLen: 32767 +typedef uint16_t VarDataLenT; // maxVarDataLen: 32767 #define VARSTR_HEADER_SIZE sizeof(VarDataLenT) #define varDataLen(v) ((VarDataLenT *)(v))[0] diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 075c9a1628..e0d4aa5820 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -185,6 +185,10 @@ STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { schemaFLen(pSchema) = pBuilder->flen; schemaVLen(pSchema) = pBuilder->vlen; +#ifdef TD_SUPPORT_BITMAP + schemaTLen(pSchema) += (int)ceil((double)schemaNCols(pSchema) / TD_VTYPE_PARTS); +#endif + memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols); return pSchema; diff --git a/source/common/src/trow.c b/source/common/src/trow.c index f383cd04dc..42dcf68b01 100644 --- a/source/common/src/trow.c +++ b/source/common/src/trow.c @@ -32,4 +32,874 @@ int trbWriteCol(SRowBuilder *pRB, void *pData, col_id_t cid) { // TODO return 0; } + +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ +#include "tdataformat.h" +#include "ulog.h" +#include "talgo.h" +#include "tcoding.h" +#include "wchar.h" +#include "tarray.h" + +static void dataColSetNEleNull(SDataCol *pCol, int nEle); +static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, + int limit2, int tRows, bool forceSetNull); + +int tdAllocMemForCol(SDataCol *pCol, int maxPoints) { + int spaceNeeded = pCol->bytes * maxPoints; + if(IS_VAR_DATA_TYPE(pCol->type)) { + spaceNeeded += sizeof(VarDataOffsetT) * maxPoints; + } + if(pCol->spaceSize < spaceNeeded) { + void* ptr = realloc(pCol->pData, spaceNeeded); + if(ptr == NULL) { + uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)spaceNeeded, + strerror(errno)); + return -1; + } else { + pCol->pData = ptr; + pCol->spaceSize = spaceNeeded; + } + } + if(IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff = POINTER_SHIFT(pCol->pData, pCol->bytes * maxPoints); + } + return 0; +} + +/** + * Duplicate the schema and return a new object + */ +STSchema *tdDupSchema(const STSchema *pSchema) { + + int tlen = sizeof(STSchema) + sizeof(STColumn) * schemaNCols(pSchema); + STSchema *tSchema = (STSchema *)malloc(tlen); + if (tSchema == NULL) return NULL; + + memcpy((void *)tSchema, (void *)pSchema, tlen); + + return tSchema; +} + +/** + * Encode a schema to dst, and return the next pointer + */ +int tdEncodeSchema(void **buf, STSchema *pSchema) { + int tlen = 0; + tlen += taosEncodeFixedI32(buf, schemaVersion(pSchema)); + tlen += taosEncodeFixedI32(buf, schemaNCols(pSchema)); + + for (int i = 0; i < schemaNCols(pSchema); i++) { + STColumn *pCol = schemaColAt(pSchema, i); + tlen += taosEncodeFixedI8(buf, colType(pCol)); + tlen += taosEncodeFixedI16(buf, colColId(pCol)); + tlen += taosEncodeFixedI16(buf, colBytes(pCol)); + } + + return tlen; +} + +/** + * Decode a schema from a binary. + */ +void *tdDecodeSchema(void *buf, STSchema **pRSchema) { + int version = 0; + int numOfCols = 0; + STSchemaBuilder schemaBuilder; + + buf = taosDecodeFixedI32(buf, &version); + buf = taosDecodeFixedI32(buf, &numOfCols); + + if (tdInitTSchemaBuilder(&schemaBuilder, version) < 0) return NULL; + + for (int i = 0; i < numOfCols; i++) { + int8_t type = 0; + int16_t colId = 0; + int16_t bytes = 0; + buf = taosDecodeFixedI8(buf, &type); + buf = taosDecodeFixedI16(buf, &colId); + buf = taosDecodeFixedI16(buf, &bytes); + if (tdAddColToSchema(&schemaBuilder, type, colId, bytes) < 0) { + tdDestroyTSchemaBuilder(&schemaBuilder); + return NULL; + } + } + + *pRSchema = tdGetSchemaFromBuilder(&schemaBuilder); + tdDestroyTSchemaBuilder(&schemaBuilder); + return buf; +} + +int tdInitTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { + if (pBuilder == NULL) return -1; + + pBuilder->tCols = 256; + pBuilder->columns = (STColumn *)malloc(sizeof(STColumn) * pBuilder->tCols); + if (pBuilder->columns == NULL) return -1; + + tdResetTSchemaBuilder(pBuilder, version); + return 0; +} + +void tdDestroyTSchemaBuilder(STSchemaBuilder *pBuilder) { + if (pBuilder) { + tfree(pBuilder->columns); + } +} + +void tdResetTSchemaBuilder(STSchemaBuilder *pBuilder, int32_t version) { + pBuilder->nCols = 0; + pBuilder->tlen = 0; + pBuilder->flen = 0; + pBuilder->vlen = 0; + pBuilder->version = version; +} + +int tdAddColToSchema(STSchemaBuilder *pBuilder, int8_t type, int16_t colId, int16_t bytes) { + if (!isValidDataType(type)) return -1; + + if (pBuilder->nCols >= pBuilder->tCols) { + pBuilder->tCols *= 2; + STColumn* columns = (STColumn *)realloc(pBuilder->columns, sizeof(STColumn) * pBuilder->tCols); + if (columns == NULL) return -1; + pBuilder->columns = columns; + } + + STColumn *pCol = &(pBuilder->columns[pBuilder->nCols]); + colSetType(pCol, type); + colSetColId(pCol, colId); + if (pBuilder->nCols == 0) { + colSetOffset(pCol, 0); + } else { + STColumn *pTCol = &(pBuilder->columns[pBuilder->nCols-1]); + colSetOffset(pCol, pTCol->offset + TYPE_BYTES[pTCol->type]); + } + + if (IS_VAR_DATA_TYPE(type)) { + colSetBytes(pCol, bytes); + pBuilder->tlen += (TYPE_BYTES[type] + bytes); + pBuilder->vlen += bytes - sizeof(VarDataLenT); + } else { + colSetBytes(pCol, TYPE_BYTES[type]); + pBuilder->tlen += TYPE_BYTES[type]; + pBuilder->vlen += TYPE_BYTES[type]; + } + + pBuilder->nCols++; + pBuilder->flen += TYPE_BYTES[type]; + + ASSERT(pCol->offset < pBuilder->flen); + + return 0; +} + +STSchema *tdGetSchemaFromBuilder(STSchemaBuilder *pBuilder) { + if (pBuilder->nCols <= 0) return NULL; + + int tlen = sizeof(STSchema) + sizeof(STColumn) * pBuilder->nCols; + + STSchema *pSchema = (STSchema *)malloc(tlen); + if (pSchema == NULL) return NULL; + + schemaVersion(pSchema) = pBuilder->version; + schemaNCols(pSchema) = pBuilder->nCols; + schemaTLen(pSchema) = pBuilder->tlen; + schemaFLen(pSchema) = pBuilder->flen; + schemaVLen(pSchema) = pBuilder->vlen; + + memcpy(schemaColAt(pSchema, 0), pBuilder->columns, sizeof(STColumn) * pBuilder->nCols); + + return pSchema; +} + +/** + * Initialize a data row + */ +void tdInitDataRow(SDataRow row, STSchema *pSchema) { + dataRowSetLen(row, TD_DATA_ROW_HEAD_SIZE + schemaFLen(pSchema)); + dataRowSetVersion(row, schemaVersion(pSchema)); +} + +SDataRow tdNewDataRowFromSchema(STSchema *pSchema) { + int32_t size = dataRowMaxBytesFromSchema(pSchema); + + SDataRow row = malloc(size); + if (row == NULL) return NULL; + + tdInitDataRow(row, pSchema); + return row; +} + +/** + * Free the SDataRow object + */ +void tdFreeDataRow(SDataRow row) { + if (row) free(row); +} + +SDataRow tdDataRowDup(SDataRow row) { + SDataRow trow = malloc(dataRowLen(row)); + if (trow == NULL) return NULL; + + dataRowCpy(trow, row); + return trow; +} + +SMemRow tdMemRowDup(SMemRow row) { + SMemRow trow = malloc(memRowTLen(row)); + if (trow == NULL) return NULL; + + memRowCpy(trow, row); + return trow; +} + +void dataColInit(SDataCol *pDataCol, STColumn *pCol, int maxPoints) { + pDataCol->type = colType(pCol); + pDataCol->colId = colColId(pCol); + pDataCol->bytes = colBytes(pCol); + pDataCol->offset = colOffset(pCol) + TD_DATA_ROW_HEAD_SIZE; + + pDataCol->len = 0; +} +// value from timestamp should be TKEY here instead of TSKEY +int dataColAppendVal(SDataCol *pCol, const void *value, int numOfRows, int maxPoints) { + ASSERT(pCol != NULL && value != NULL); + + if (isAllRowsNull(pCol)) { + if (isNull(value, pCol->type)) { + // all null value yet, just return + return 0; + } + + if(tdAllocMemForCol(pCol, maxPoints) < 0) return -1; + if (numOfRows > 0) { + // Find the first not null value, fill all previouse values as NULL + dataColSetNEleNull(pCol, numOfRows); + } + } + + if (IS_VAR_DATA_TYPE(pCol->type)) { + // set offset + pCol->dataOff[numOfRows] = pCol->len; + // Copy data + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, varDataTLen(value)); + // Update the length + pCol->len += varDataTLen(value); + } else { + ASSERT(pCol->len == TYPE_BYTES[pCol->type] * numOfRows); + memcpy(POINTER_SHIFT(pCol->pData, pCol->len), value, pCol->bytes); + pCol->len += pCol->bytes; + } + return 0; +} + +static FORCE_INLINE const void *tdGetColDataOfRowUnsafe(SDataCol *pCol, int row) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + return POINTER_SHIFT(pCol->pData, pCol->dataOff[row]); + } else { + return POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * row); + } +} + +bool isNEleNull(SDataCol *pCol, int nEle) { + if(isAllRowsNull(pCol)) return true; + for (int i = 0; i < nEle; i++) { + if (!isNull(tdGetColDataOfRowUnsafe(pCol, i), pCol->type)) return false; + } + return true; +} + +static FORCE_INLINE void dataColSetNullAt(SDataCol *pCol, int index) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + pCol->dataOff[index] = pCol->len; + char *ptr = POINTER_SHIFT(pCol->pData, pCol->len); + setVardataNull(ptr, pCol->type); + pCol->len += varDataTLen(ptr); + } else { + setNull(POINTER_SHIFT(pCol->pData, TYPE_BYTES[pCol->type] * index), pCol->type, pCol->bytes); + pCol->len += TYPE_BYTES[pCol->type]; + } +} + +static void dataColSetNEleNull(SDataCol *pCol, int nEle) { + if (IS_VAR_DATA_TYPE(pCol->type)) { + pCol->len = 0; + for (int i = 0; i < nEle; i++) { + dataColSetNullAt(pCol, i); + } + } else { + setNullN(pCol->pData, pCol->type, pCol->bytes, nEle); + pCol->len = TYPE_BYTES[pCol->type] * nEle; + } +} + +void dataColSetOffset(SDataCol *pCol, int nEle) { + ASSERT(((pCol->type == TSDB_DATA_TYPE_BINARY) || (pCol->type == TSDB_DATA_TYPE_NCHAR))); + + void *tptr = pCol->pData; + // char *tptr = (char *)(pCol->pData); + + VarDataOffsetT offset = 0; + for (int i = 0; i < nEle; i++) { + pCol->dataOff[i] = offset; + offset += varDataTLen(tptr); + tptr = POINTER_SHIFT(tptr, varDataTLen(tptr)); + } +} + +SDataCols *tdNewDataCols(int maxCols, int maxRows) { + SDataCols *pCols = (SDataCols *)calloc(1, sizeof(SDataCols)); + if (pCols == NULL) { + uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCols), strerror(errno)); + return NULL; + } + + pCols->maxPoints = maxRows; + pCols->maxCols = maxCols; + pCols->numOfRows = 0; + pCols->numOfCols = 0; + + if (maxCols > 0) { + pCols->cols = (SDataCol *)calloc(maxCols, sizeof(SDataCol)); + if (pCols->cols == NULL) { + uDebug("malloc failure, size:%" PRId64 " failed, reason:%s", (int64_t)sizeof(SDataCol) * maxCols, + strerror(errno)); + tdFreeDataCols(pCols); + return NULL; + } + int i; + for(i = 0; i < maxCols; i++) { + pCols->cols[i].spaceSize = 0; + pCols->cols[i].len = 0; + pCols->cols[i].pData = NULL; + pCols->cols[i].dataOff = NULL; + } + } + + return pCols; +} + +int tdInitDataCols(SDataCols *pCols, STSchema *pSchema) { + int i; + int oldMaxCols = pCols->maxCols; + if (schemaNCols(pSchema) > oldMaxCols) { + pCols->maxCols = schemaNCols(pSchema); + void* ptr = (SDataCol *)realloc(pCols->cols, sizeof(SDataCol) * pCols->maxCols); + if (ptr == NULL) return -1; + pCols->cols = ptr; + for(i = oldMaxCols; i < pCols->maxCols; i++) { + pCols->cols[i].pData = NULL; + pCols->cols[i].dataOff = NULL; + pCols->cols[i].spaceSize = 0; + } + } + + tdResetDataCols(pCols); + pCols->numOfCols = schemaNCols(pSchema); + + for (i = 0; i < schemaNCols(pSchema); i++) { + dataColInit(pCols->cols + i, schemaColAt(pSchema, i), pCols->maxPoints); + } + + return 0; +} + +SDataCols *tdFreeDataCols(SDataCols *pCols) { + int i; + if (pCols) { + if(pCols->cols) { + int maxCols = pCols->maxCols; + for(i = 0; i < maxCols; i++) { + SDataCol *pCol = &pCols->cols[i]; + tfree(pCol->pData); + } + free(pCols->cols); + pCols->cols = NULL; + } + free(pCols); + } + return NULL; +} + +SDataCols *tdDupDataCols(SDataCols *pDataCols, bool keepData) { + SDataCols *pRet = tdNewDataCols(pDataCols->maxCols, pDataCols->maxPoints); + if (pRet == NULL) return NULL; + + pRet->numOfCols = pDataCols->numOfCols; + pRet->sversion = pDataCols->sversion; + if (keepData) pRet->numOfRows = pDataCols->numOfRows; + + for (int i = 0; i < pDataCols->numOfCols; i++) { + pRet->cols[i].type = pDataCols->cols[i].type; + pRet->cols[i].colId = pDataCols->cols[i].colId; + pRet->cols[i].bytes = pDataCols->cols[i].bytes; + pRet->cols[i].offset = pDataCols->cols[i].offset; + + if (keepData) { + if (pDataCols->cols[i].len > 0) { + if(tdAllocMemForCol(&pRet->cols[i], pRet->maxPoints) < 0) { + tdFreeDataCols(pRet); + return NULL; + } + pRet->cols[i].len = pDataCols->cols[i].len; + memcpy(pRet->cols[i].pData, pDataCols->cols[i].pData, pDataCols->cols[i].len); + if (IS_VAR_DATA_TYPE(pRet->cols[i].type)) { + int dataOffSize = sizeof(VarDataOffsetT) * pDataCols->maxPoints; + memcpy(pRet->cols[i].dataOff, pDataCols->cols[i].dataOff, dataOffSize); + } + } + } + } + + return pRet; +} + +void tdResetDataCols(SDataCols *pCols) { + if (pCols != NULL) { + pCols->numOfRows = 0; + for (int i = 0; i < pCols->maxCols; i++) { + dataColReset(pCols->cols + i); + } + } +} + +static void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { + ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < dataRowKey(row)); + + int rcol = 0; + int dcol = 0; + + while (dcol < pCols->numOfCols) { + bool setCol = 0; + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + dcol++; + continue; + } + + STColumn *pRowCol = schemaColAt(pSchema, rcol); + if (pRowCol->colId == pDataCol->colId) { + void *value = tdGetRowDataOfCol(row, pRowCol->type, pRowCol->offset + TD_DATA_ROW_HEAD_SIZE); + if(!isNull(value, pDataCol->type)) setCol = 1; + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + dcol++; + rcol++; + } else if (pRowCol->colId < pDataCol->colId) { + rcol++; + } else { + if(forceSetNull || setCol) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + } + dcol++; + } + } + pCols->numOfRows++; +} + +static void tdAppendKvRowToDataCol(SKVRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { + ASSERT(pCols->numOfRows == 0 || dataColsKeyLast(pCols) < kvRowKey(row)); + + int rcol = 0; + int dcol = 0; + + int nRowCols = kvRowNCols(row); + + while (dcol < pCols->numOfCols) { + bool setCol = 0; + SDataCol *pDataCol = &(pCols->cols[dcol]); + if (rcol >= nRowCols || rcol >= schemaNCols(pSchema)) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + ++dcol; + continue; + } + + SColIdx *colIdx = kvRowColIdxAt(row, rcol); + + if (colIdx->colId == pDataCol->colId) { + void *value = tdGetKvRowDataOfCol(row, colIdx->offset); + if(!isNull(value, pDataCol->type)) setCol = 1; + dataColAppendVal(pDataCol, value, pCols->numOfRows, pCols->maxPoints); + ++dcol; + ++rcol; + } else if (colIdx->colId < pDataCol->colId) { + ++rcol; + } else { + if(forceSetNull || setCol) { + dataColAppendVal(pDataCol, getNullValue(pDataCol->type), pCols->numOfRows, pCols->maxPoints); + } + ++dcol; + } + } + pCols->numOfRows++; +} + +void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols, bool forceSetNull) { + if (isDataRow(row)) { + tdAppendDataRowToDataCol(memRowDataBody(row), pSchema, pCols, forceSetNull); + } else if (isKvRow(row)) { + tdAppendKvRowToDataCol(memRowKvBody(row), pSchema, pCols, forceSetNull); + } else { + ASSERT(0); + } +} + +int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset, bool forceSetNull) { + ASSERT(rowsToMerge > 0 && rowsToMerge <= source->numOfRows); + ASSERT(target->numOfCols == source->numOfCols); + int offset = 0; + + if (pOffset == NULL) { + pOffset = &offset; + } + + SDataCols *pTarget = NULL; + + if ((target->numOfRows == 0) || (dataColsKeyLast(target) < dataColsKeyAtRow(source, *pOffset))) { // No overlap + ASSERT(target->numOfRows + rowsToMerge <= target->maxPoints); + for (int i = 0; i < rowsToMerge; i++) { + for (int j = 0; j < source->numOfCols; j++) { + if (source->cols[j].len > 0 || target->cols[j].len > 0) { + dataColAppendVal(target->cols + j, tdGetColDataOfRow(source->cols + j, i + (*pOffset)), target->numOfRows, + target->maxPoints); + } + } + target->numOfRows++; + } + (*pOffset) += rowsToMerge; + } else { + pTarget = tdDupDataCols(target, true); + if (pTarget == NULL) goto _err; + + int iter1 = 0; + tdMergeTwoDataCols(target, pTarget, &iter1, pTarget->numOfRows, source, pOffset, source->numOfRows, + pTarget->numOfRows + rowsToMerge, forceSetNull); + } + + tdFreeDataCols(pTarget); + return 0; + +_err: + tdFreeDataCols(pTarget); + return -1; +} + +// src2 data has more priority than src1 +static void tdMergeTwoDataCols(SDataCols *target, SDataCols *src1, int *iter1, int limit1, SDataCols *src2, int *iter2, + int limit2, int tRows, bool forceSetNull) { + tdResetDataCols(target); + ASSERT(limit1 <= src1->numOfRows && limit2 <= src2->numOfRows); + + while (target->numOfRows < tRows) { + if (*iter1 >= limit1 && *iter2 >= limit2) break; + + TSKEY key1 = (*iter1 >= limit1) ? INT64_MAX : dataColsKeyAt(src1, *iter1); + TKEY tkey1 = (*iter1 >= limit1) ? TKEY_NULL : dataColsTKeyAt(src1, *iter1); + TSKEY key2 = (*iter2 >= limit2) ? INT64_MAX : dataColsKeyAt(src2, *iter2); + TKEY tkey2 = (*iter2 >= limit2) ? TKEY_NULL : dataColsTKeyAt(src2, *iter2); + + ASSERT(tkey1 == TKEY_NULL || (!TKEY_IS_DELETED(tkey1))); + + if (key1 < key2) { + for (int i = 0; i < src1->numOfCols; i++) { + ASSERT(target->cols[i].type == src1->cols[i].type); + if (src1->cols[i].len > 0 || target->cols[i].len > 0) { + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, + target->maxPoints); + } + } + + target->numOfRows++; + (*iter1)++; + } else if (key1 >= key2) { + if ((key1 > key2) || (key1 == key2 && !TKEY_IS_DELETED(tkey2))) { + for (int i = 0; i < src2->numOfCols; i++) { + ASSERT(target->cols[i].type == src2->cols[i].type); + if (src2->cols[i].len > 0 && !isNull(src2->cols[i].pData, src2->cols[i].type)) { + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src2->cols + i, *iter2), target->numOfRows, + target->maxPoints); + } else if(!forceSetNull && key1 == key2 && src1->cols[i].len > 0) { + dataColAppendVal(&(target->cols[i]), tdGetColDataOfRow(src1->cols + i, *iter1), target->numOfRows, + target->maxPoints); + } else if(target->cols[i].len > 0) { + dataColSetNullAt(&target->cols[i], target->numOfRows); + } + } + target->numOfRows++; + } + + (*iter2)++; + if (key1 == key2) (*iter1)++; + } + + ASSERT(target->numOfRows <= target->maxPoints); + } +} + +SKVRow tdKVRowDup(SKVRow row) { + SKVRow trow = malloc(kvRowLen(row)); + if (trow == NULL) return NULL; + + kvRowCpy(trow, row); + return trow; +} + +static int compareColIdx(const void* a, const void* b) { + const SColIdx* x = (const SColIdx*)a; + const SColIdx* y = (const SColIdx*)b; + if (x->colId > y->colId) { + return 1; + } + if (x->colId < y->colId) { + return -1; + } + return 0; +} + +void tdSortKVRowByColIdx(SKVRow row) { + qsort(kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), compareColIdx); +} + +int tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value) { + SColIdx *pColIdx = NULL; + SKVRow row = *orow; + SKVRow nrow = NULL; + void * ptr = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_GE); + + if (ptr == NULL || ((SColIdx *)ptr)->colId > colId) { // need to add a column value to the row + int diff = IS_VAR_DATA_TYPE(type) ? varDataTLen(value) : TYPE_BYTES[type]; + int nRowLen = kvRowLen(row) + sizeof(SColIdx) + diff; + int oRowCols = kvRowNCols(row); + + ASSERT(diff > 0); + nrow = malloc(nRowLen); + if (nrow == NULL) return -1; + + kvRowSetLen(nrow, nRowLen); + kvRowSetNCols(nrow, oRowCols + 1); + + memcpy(kvRowColIdx(nrow), kvRowColIdx(row), sizeof(SColIdx) * oRowCols); + memcpy(kvRowValues(nrow), kvRowValues(row), kvRowValLen(row)); + + pColIdx = kvRowColIdxAt(nrow, oRowCols); + pColIdx->colId = colId; + pColIdx->offset = kvRowValLen(row); + + memcpy(kvRowColVal(nrow, pColIdx), value, diff); // copy new value + + tdSortKVRowByColIdx(nrow); + + *orow = nrow; + free(row); + } else { + ASSERT(((SColIdx *)ptr)->colId == colId); + if (IS_VAR_DATA_TYPE(type)) { + void *pOldVal = kvRowColVal(row, (SColIdx *)ptr); + + if (varDataTLen(value) == varDataTLen(pOldVal)) { // just update the column value in place + memcpy(pOldVal, value, varDataTLen(value)); + } else { // need to reallocate the memory + int16_t nlen = kvRowLen(row) + (varDataTLen(value) - varDataTLen(pOldVal)); + ASSERT(nlen > 0); + nrow = malloc(nlen); + if (nrow == NULL) return -1; + + kvRowSetLen(nrow, nlen); + kvRowSetNCols(nrow, kvRowNCols(row)); + + int zsize = sizeof(SColIdx) * kvRowNCols(row) + ((SColIdx *)ptr)->offset; + memcpy(kvRowColIdx(nrow), kvRowColIdx(row), zsize); + memcpy(kvRowColVal(nrow, ((SColIdx *)ptr)), value, varDataTLen(value)); + // Copy left value part + int lsize = kvRowLen(row) - TD_KV_ROW_HEAD_SIZE - zsize - varDataTLen(pOldVal); + if (lsize > 0) { + memcpy(POINTER_SHIFT(nrow, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(value)), + POINTER_SHIFT(row, TD_KV_ROW_HEAD_SIZE + zsize + varDataTLen(pOldVal)), lsize); + } + + for (int i = 0; i < kvRowNCols(nrow); i++) { + pColIdx = kvRowColIdxAt(nrow, i); + + if (pColIdx->offset > ((SColIdx *)ptr)->offset) { + pColIdx->offset = pColIdx->offset - varDataTLen(pOldVal) + varDataTLen(value); + } + } + + *orow = nrow; + free(row); + } + } else { + memcpy(kvRowColVal(row, (SColIdx *)ptr), value, TYPE_BYTES[type]); + } + } + + return 0; +} + +int tdEncodeKVRow(void **buf, SKVRow row) { + // May change the encode purpose + if (buf != NULL) { + kvRowCpy(*buf, row); + *buf = POINTER_SHIFT(*buf, kvRowLen(row)); + } + + return kvRowLen(row); +} + +void *tdDecodeKVRow(void *buf, SKVRow *row) { + *row = tdKVRowDup(buf); + if (*row == NULL) return NULL; + return POINTER_SHIFT(buf, kvRowLen(*row)); +} + +int tdInitKVRowBuilder(SKVRowBuilder *pBuilder) { + pBuilder->tCols = 128; + pBuilder->nCols = 0; + pBuilder->pColIdx = (SColIdx *)malloc(sizeof(SColIdx) * pBuilder->tCols); + if (pBuilder->pColIdx == NULL) return -1; + pBuilder->alloc = 1024; + pBuilder->size = 0; + pBuilder->buf = malloc(pBuilder->alloc); + if (pBuilder->buf == NULL) { + free(pBuilder->pColIdx); + return -1; + } + return 0; +} + +void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder) { + tfree(pBuilder->pColIdx); + tfree(pBuilder->buf); +} + +void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) { + pBuilder->nCols = 0; + pBuilder->size = 0; +} + +SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) { + int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size; + if (tlen == 0) return NULL; + + tlen += TD_KV_ROW_HEAD_SIZE; + + SKVRow row = malloc(tlen); + if (row == NULL) return NULL; + + kvRowSetNCols(row, pBuilder->nCols); + kvRowSetLen(row, tlen); + + memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols); + memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size); + + return row; +} + +SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2) { +#if 0 + ASSERT(memRowKey(row1) == memRowKey(row2)); + ASSERT(schemaVersion(pSchema1) == memRowVersion(row1)); + ASSERT(schemaVersion(pSchema2) == memRowVersion(row2)); + ASSERT(schemaVersion(pSchema1) >= schemaVersion(pSchema2)); +#endif + + SArray *stashRow = taosArrayInit(pSchema1->numOfCols, sizeof(SColInfo)); + if (stashRow == NULL) { + return NULL; + } + + SMemRow pRow = buffer; + SDataRow dataRow = memRowDataBody(pRow); + memRowSetType(pRow, SMEM_ROW_DATA); + dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version + dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen)); + + TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE; + + int32_t i = 0; // row1 + int32_t j = 0; // row2 + int32_t nCols1 = schemaNCols(pSchema1); + int32_t nCols2 = schemaNCols(pSchema2); + SColInfo colInfo = {0}; + int32_t kvIdx1 = 0, kvIdx2 = 0; + + while (i < nCols1) { + STColumn *pCol = schemaColAt(pSchema1, i); + void * val1 = tdGetMemRowDataOfColEx(row1, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx1); + // if val1 != NULL, use val1; + if (val1 != NULL && !isNull(val1, pCol->type)) { + tdAppendColVal(dataRow, val1, pCol->type, pCol->offset); + kvLen += tdGetColAppendLen(SMEM_ROW_KV, val1, pCol->type); + setSColInfo(&colInfo, pCol->colId, pCol->type, val1); + taosArrayPush(stashRow, &colInfo); + ++i; // next col + continue; + } + + void *val2 = NULL; + while (j < nCols2) { + STColumn *tCol = schemaColAt(pSchema2, j); + if (tCol->colId < pCol->colId) { + ++j; + continue; + } + if (tCol->colId == pCol->colId) { + val2 = tdGetMemRowDataOfColEx(row2, tCol->colId, tCol->type, TD_DATA_ROW_HEAD_SIZE + tCol->offset, &kvIdx2); + } else if (tCol->colId > pCol->colId) { + // set NULL + } + break; + } // end of while(jtype); + } + tdAppendColVal(dataRow, val2, pCol->type, pCol->offset); + if (!isNull(val2, pCol->type)) { + kvLen += tdGetColAppendLen(SMEM_ROW_KV, val2, pCol->type); + setSColInfo(&colInfo, pCol->colId, pCol->type, val2); + taosArrayPush(stashRow, &colInfo); + } + + ++i; // next col + } + + dataLen = memRowTLen(pRow); + + if (kvLen < dataLen) { + // scan stashRow and generate SKVRow + memset(buffer, 0, sizeof(dataLen)); + SMemRow tRow = buffer; + memRowSetType(tRow, SMEM_ROW_KV); + SKVRow kvRow = (SKVRow)memRowKvBody(tRow); + int16_t nKvNCols = (int16_t) taosArrayGetSize(stashRow); + kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nKvNCols)); + kvRowSetNCols(kvRow, nKvNCols); + memRowSetKvVersion(tRow, pSchema1->version); + + int32_t toffset = 0; + int16_t k; + for (k = 0; k < nKvNCols; ++k) { + SColInfo *pColInfo = taosArrayGet(stashRow, k); + tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset); + toffset += sizeof(SColIdx); + } + ASSERT(kvLen == memRowTLen(tRow)); + } + taosArrayDestroy(stashRow); + return buffer; +} + #endif \ No newline at end of file diff --git a/source/dnode/vnode/src/tsdb/tsdbRead.c b/source/dnode/vnode/src/tsdb/tsdbRead.c index 15748118d7..7099e1f300 100644 --- a/source/dnode/vnode/src/tsdb/tsdbRead.c +++ b/source/dnode/vnode/src/tsdb/tsdbRead.c @@ -699,7 +699,7 @@ static bool initTableMemIterator(STsdbReadHandle* pHandle, STableCheckInfo* pChe STbData** pMem = NULL; STbData** pIMem = NULL; - TKEY tLastKey = 0; /// keyToTkey(pCheckInfo->lastKey); + TSKEY tLastKey = 0; /// keyToTkey(pCheckInfo->lastKey); if (pHandle->pTsdb->mem != NULL) { pMem = taosHashGet(pHandle->pTsdb->mem->pHashIdx, &pCheckInfo->tableId, sizeof(pCheckInfo->tableId)); if (pMem != NULL) { @@ -1661,11 +1661,14 @@ static void mergeTwoRowFromMem(STsdbReadHandle* pTsdbReadHandle, int32_t capacit SET_DOUBLE_PTR(pData, value); break; case TSDB_DATA_TYPE_TIMESTAMP: +#if 0 // only TSKEY supported since 3.0 if (pColInfo->info.colId == PRIMARYKEY_TIMESTAMP_COL_ID) { *(TSKEY *)pData = tdGetKey(*(TKEY *)value); } else { *(TSKEY *)pData = *(TSKEY *)value; } +#endif + *(TSKEY*)pData = *(TSKEY*)value; break; default: memcpy(pData, value, pColInfo->info.bytes);