diff --git a/include/common/tdataformat.h b/include/common/tdataformat.h index cc30cd78f5..6fbba034bc 100644 --- a/include/common/tdataformat.h +++ b/include/common/tdataformat.h @@ -328,6 +328,7 @@ typedef struct SDataCol { int len; // column data length VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column void * pData; // Actual data pointer + void * pBitmap; // Bitmap pointer to support None value TSKEY ts; // only used in last NULL column } SDataCol; @@ -572,7 +573,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, return 0; } -// ----------------- SMemRow appended with sequential data row structure +// ----------------- SRow appended with tuple row structure /* * |---------|------------------------------------------------- len ---------------------------------->| * |<-------- Head ------>|<--------- flen -------------->| | @@ -585,7 +586,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, * NOTE: timestamp in this row structure is TKEY instead of TSKEY */ -// ----------------- SMemRow appended with extended K-V data row structure +// ----------------- SRow appended with extended K-V data row structure /* |--------------------|------------------------------------------------ len ---------------------------------->| * |<------------- Head ------------>|<--------- flen -------------->| | * |--------------------+----------+--------------------------------------------+---------------------------------+ @@ -605,22 +606,17 @@ typedef void *SMemRow; #define SMEM_ROW_DATA 0x0U // SDataRow #define SMEM_ROW_KV 0x01U // SKVRow -#define SMEM_ROW_CONVERT 0x80U // SMemRow convert flag -#define KVRatioKV (0.2f) // all bool -#define KVRatioPredict (0.4f) -#define KVRatioData (0.75f) // all bigint #define KVRatioConvert (0.9f) #define memRowType(r) ((*(uint8_t *)(r)) & 0x01) #define memRowSetType(r, t) ((*(uint8_t *)(r)) = (t)) // set the total byte in case of dirty memory -#define memRowSetConvert(r) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0x7F) | SMEM_ROW_CONVERT)) // highest bit #define isDataRowT(t) (SMEM_ROW_DATA == (((uint8_t)(t)) & 0x01)) #define isDataRow(r) (SMEM_ROW_DATA == memRowType(r)) #define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01)) #define isKvRow(r) (SMEM_ROW_KV == memRowType(r)) -#define isNeedConvertRow(r) (((*(uint8_t *)(r)) & 0x80) == SMEM_ROW_CONVERT) +#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert)) #define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag #define memRowKvBody(r) \ @@ -630,9 +626,9 @@ typedef void *SMemRow; #define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r)) // 0~65535 #define memRowDataTLen(r) \ - ((TDRowTLenT)(memRowDataLen(r) + TD_MEM_ROW_TYPE_SIZE)) // using uint32_t/int32_t to store the TLen + ((TDRowLenT)(memRowDataLen(r) + TD_MEM_ROW_TYPE_SIZE)) // using uint32_t/int32_t to store the TLen -#define memRowKvTLen(r) ((TDRowTLenT)(memRowKvLen(r) + TD_MEM_ROW_KV_TYPE_VER_SIZE)) +#define memRowKvTLen(r) ((TDRowLenT)(memRowKvLen(r) + TD_MEM_ROW_KV_TYPE_VER_SIZE)) #define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r)) #define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen @@ -774,7 +770,7 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch */ #define PAYLOAD_NCOLS_LEN sizeof(uint16_t) -#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowTLenT)) +#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT)) #define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN) #define PAYLOAD_ID_LEN sizeof(int16_t) #define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t)) @@ -784,7 +780,7 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch #define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN) #define payloadType(r) (*(uint8_t *)(r)) #define payloadSetType(r, t) (payloadType(r) = (t)) -#define payloadTLen(r) (*(TDRowTLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header +#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header #define payloadSetTLen(r, l) (payloadTLen(r) = (l)) #define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET)) #define payloadSetNCols(r, n) (payloadNCols(r) = (n)) diff --git a/include/common/trow.h b/include/common/trow.h index 4092500d5f..f562521dad 100644 --- a/include/common/trow.h +++ b/include/common/trow.h @@ -26,13 +26,102 @@ extern "C" { #endif -#define TD_UNDECIDED_ROW 0 -#define TD_OR_ROW 1 -#define TD_KV_ROW 2 +#define TD_SUPPORT_NONE_VAL + +// row type +#define TD_ROW_TP 0 // default +#define TD_ROW_KV 1 + +// val type +#define TD_VTYPE_NORM 0x0 // normal val: not none, not null +#define TD_VTYPE_NONE 0x01 // none or unknown/undefined +#define TD_VTYPE_NULL 0x02 // null val + +#ifdef TD_SUPPORT_NONE_VAL +static FORCE_INLINE bool tdValTypeIsNorm(int8_t valType) { return (valType & TD_VTYPE_NORM); } +static FORCE_INLINE bool tdValTypeIsNone(int8_t valType) { return (valType & TD_VTYPE_NONE); } +static FORCE_INLINE bool tdValTypeIsNull(int8_t valType) { return (valType & TD_VTYPE_NULL); } +#else + +#endif + +static FORCE_INLINE bool tdValIsNorm(int8_t valType, const void *val, int32_t type) { +#ifdef TD_SUPPORT_NONE_VAL + return tdValTypeIsNorm(valType); +#else + return !isNull(val, type); +#endif +} +static FORCE_INLINE bool tdValIsNone(int8_t valType) { +#ifdef TD_SUPPORT_NONE_VAL + return tdValTypeIsNone(valType); +#else + return false; +#endif +} +static FORCE_INLINE bool tdValIsNull(int8_t valType, const void *val, int32_t type) { +#ifdef TD_SUPPORT_NONE_VAL + return tdValTypeIsNull(valType); +#else + return isNull(val, type); +#endif +} + +#define TD_ROW_LEN(r) +#define TD_ROW_TLEN(r) +#define TD_ROW_TYPE(r) +#define TD_ROW_BODY(r) +#define TD_ROW_TKEY(r) +#define TD_ROW_KEY(r) +#define TD_ROW_DELETED(r) +#define TD_ROW_VERSION(r) +#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s) + +#define TD_ROW_SET_LEN(r, l) +#define TD_ROW_SET_VERSION(r, v) +#define TD_ROW_CPY(dst, r) + +// ----------------- SRow appended with tuple row structure(STpRow) +/* + * |---------|------------------------------------------------- len -------------------------------------->| + * |<-------- Head ------>|<--------- flen ------------->|<-- blen -->| | + * |---------+---------------------+---------------------------------+-------------+-----------------------+ + * | uint8_t | uint32_t | int16_t | | | | + * |---------+----------+----------+---------------------------------+-------------------------------------+ + * | flag | len | sversion |(key) first part | bitmap | second part | + * +---------+----------+----------+---------------------------------+-------------------------------------+ + * + * NOTE: Timestamp in this row structure is TKEY instead of TSKEY + * Use 2 bits in bitmap for each column + * flag: + * 0: flag&0x01 0 STpRow, 1 SKvRow // since 2.0 + * 1: flag&0x02 0 without bitmap, 1 with bitmap. // 如果None值支持数据库或者更小维度,则需要指定一个bit区分。TODO + * 2: endian(0 big endian, 1 little endian) + * 3-7: reserved(value 0) + */ + +// ----------------- SRow appended with K-V row structure(SKvRow) +/* |--------------------|------------------------------------------------ len --------------------------->| + * |<-------- Head ---->|<--------- colsIdxLen ---------->|<-- blen -->| | + * |--------------------+----------+-----------------------------------------------------------------------+ + * | uint8_t | uint32_t | int16_t | | | | + * |---------+----------+----------+-----------------------------------------------------------------------+ + * | flag | len | ncols |(keyColId) cols index | bitmap | data part | + * |---------+----------+----------+---------------------------------+-------------------------------------+ + * + * NOTE: Timestamp in this row structure is TKEY instead of TSKEY + */ + +typedef void *SRow; + +typedef struct { + int8_t valType; + void * val; +} SCellVal; typedef struct { // TODO -} SOrRow; +} STpRow; // tuple typedef struct { col_id_t cid; diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 991bb5c6be..e97acdbee4 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -10,8 +10,7 @@ extern "C" { // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR typedef int32_t VarDataOffsetT; -typedef uint16_t TDRowLenT; // not including overhead: 0 ~ 65535 -typedef uint32_t TDRowTLenT; // total length, including overhead +typedef uint32_t TDRowLenT; typedef struct tstr { VarDataLenT len; diff --git a/include/util/types.h b/include/util/types.h index 35cb614f6f..dca8cd9a91 100644 --- a/include/util/types.h +++ b/include/util/types.h @@ -79,7 +79,6 @@ typedef int16_t VarDataLenT; // maxVarDataLen: 32767 #define varDataVal(v) ((void *)((char *)v + VARSTR_HEADER_SIZE)) typedef int32_t VarDataOffsetT; -typedef int16_t VarDataLenT; // maxVarDataLen: 32767 #ifdef __cplusplus } diff --git a/source/common/src/tdataformat.c b/source/common/src/tdataformat.c index 94d93b651a..075c9a1628 100644 --- a/source/common/src/tdataformat.c +++ b/source/common/src/tdataformat.c @@ -792,7 +792,7 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen)); - TDRowTLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE; + TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE; int32_t i = 0; // row1 int32_t j = 0; // row2 diff --git a/source/libs/parser/inc/dataBlockMgt.h b/source/libs/parser/inc/dataBlockMgt.h index cd84222c65..6e51f8b31f 100644 --- a/source/libs/parser/inc/dataBlockMgt.h +++ b/source/libs/parser/inc/dataBlockMgt.h @@ -55,11 +55,12 @@ typedef struct SParsedDataColInfo { int16_t numOfCols; int16_t numOfBound; uint16_t flen; // TODO: get from STSchema - uint16_t allNullLen; // TODO: get from STSchema + uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow) uint16_t extendedVarLen; - int32_t *boundedColumns; // bound column idx according to schema - SBoundColumn *cols; - SBoundIdxInfo *colIdxInfo; + uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part) + int32_t * boundedColumns; // bound column idx according to schema + SBoundColumn * cols; + SBoundIdxInfo *colIdxInfo; int8_t orderStatus; // bound columns } SParsedDataColInfo; @@ -71,7 +72,7 @@ typedef struct SMemRowInfo { typedef struct { uint8_t memRowType; // default is 0, that is SDataRow uint8_t compareStat; // 0 no need, 1 need compare - TDRowTLenT kvRowInitLen; + int32_t rowSize; SMemRowInfo *rowInfo; } SMemRowBuilder; @@ -143,16 +144,6 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp } } -static FORCE_INLINE void convertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { - if (isDataRow(row)) { - if (kvLen < (dataLen * KVRatioConvert)) { - memRowSetConvert(row); - } - } else if (kvLen > dataLen) { - memRowSetConvert(row); - } -} - static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) { pBlocks->tid = pTableMeta->suid; pBlocks->uid = pTableMeta->uid; @@ -171,7 +162,7 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs); void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols); void destroyBoundColumnInfo(SParsedDataColInfo* pColList); void destroyBlockArrayList(SArray* pDataBlockList); -int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen); +int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo); int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows); int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList); diff --git a/source/libs/parser/src/dataBlockMgt.c b/source/libs/parser/src/dataBlockMgt.c index bc4eae7ae9..ca8813c6cb 100644 --- a/source/libs/parser/src/dataBlockMgt.c +++ b/source/libs/parser/src/dataBlockMgt.c @@ -448,18 +448,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB } } else { for (int32_t i = 0; i < numOfRows; ++i) { - char* payload = (blkKeyTuple + i)->payloadAddr; - if (isNeedConvertRow(payload)) { - convertSMemRow(pDataBlock, payload, pTableDataBlock); - TDRowTLenT rowTLen = memRowTLen(pDataBlock); - pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); - pBlock->dataLen += rowTLen; - } else { - TDRowTLenT rowTLen = memRowTLen(payload); - memcpy(pDataBlock, payload, rowTLen); - pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); - pBlock->dataLen += rowTLen; - } + char* payload = (blkKeyTuple + i)->payloadAddr; + TDRowLenT rowTLen = memRowTLen(payload); + memcpy(pDataBlock, payload, rowTLen); + pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); + pBlock->dataLen += rowTLen; } } @@ -577,8 +570,8 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t return TSDB_CODE_SUCCESS; } -int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen) { - ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols)); +int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) { + ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols)); if (nRows > 0) { // already init(bind multiple rows by single column) if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) { @@ -586,41 +579,12 @@ int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCo } } - // default compareStat is ROW_COMPARE_NO_NEED - if (nBoundCols == 0) { // file input - pBuilder->memRowType = SMEM_ROW_DATA; - return TSDB_CODE_SUCCESS; + uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen; + uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen; + if (isUtilizeKVRow(kvLen, dataLen)) { + pBuilder->memRowType = SMEM_ROW_KV; } else { - float boundRatio = ((float)nBoundCols / (float)nCols); - - if (boundRatio < KVRatioKV) { - pBuilder->memRowType = SMEM_ROW_KV; - return TSDB_CODE_SUCCESS; - } else if (boundRatio > KVRatioData) { - pBuilder->memRowType = SMEM_ROW_DATA; - return TSDB_CODE_SUCCESS; - } - pBuilder->compareStat = ROW_COMPARE_NEED; - - if (boundRatio < KVRatioPredict) { - pBuilder->memRowType = SMEM_ROW_KV; - } else { - pBuilder->memRowType = SMEM_ROW_DATA; - } - } - - pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx); - - if (nRows > 0) { - pBuilder->rowInfo = calloc(nRows, sizeof(SMemRowInfo)); - if (pBuilder->rowInfo == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - for (int i = 0; i < nRows; ++i) { - (pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; - (pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen; - } + pBuilder->memRowType = SMEM_ROW_DATA; } return TSDB_CODE_SUCCESS; diff --git a/source/libs/parser/src/insertParser.c b/source/libs/parser/src/insertParser.c index 8b1506bad1..ccb5313ed1 100644 --- a/source/libs/parser/src/insertParser.c +++ b/source/libs/parser/src/insertParser.c @@ -440,13 +440,8 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, } if (!isParseBindParam) { - // 2. check and set convert flag - if (pBuilder->compareStat == ROW_COMPARE_NEED) { - convertMemRow(row, spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE, pBuilder->kvRowInitLen); - } - - // 3. set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) { + // set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) { SDataRow dataRow = memRowDataBody(row); for (int32_t i = 0; i < spd->numOfCols; ++i) { if (spd->cols[i].valStat == VAL_STAT_NONE) { @@ -456,7 +451,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, } } - *len = getExtendedRowSize(pDataBlocks); + *len = pBuilder->rowSize; return TSDB_CODE_SUCCESS; } @@ -464,7 +459,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) { STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta); int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, pDataBlock->boundColumnInfo.allNullLen)); + CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo)); (*numOfRows) = 0; char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" diff --git a/source/libs/parser/src/parserUtil.c b/source/libs/parser/src/parserUtil.c index 4d506b84a0..df415460be 100644 --- a/source/libs/parser/src/parserUtil.c +++ b/source/libs/parser/src/parserUtil.c @@ -402,7 +402,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int * @param dataBlocks * @return */ -//int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name, +// int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name, // STableMeta* pTableMeta, STableDataBlocks** dataBlocks) { // STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks)); // if (dataBuf == NULL) { @@ -449,7 +449,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int // return TSDB_CODE_SUCCESS; //} // -//int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, +// int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize, // SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks, // SArray* pBlockList) { // *dataBlocks = NULL; @@ -474,7 +474,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int //} // //// Erase the empty space reserved for binary data -//static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam, +// static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam, // SBlockKeyTuple* blkKeyTuple) { // // TODO: optimize this function, handle the case while binary is not presented // STableMeta* pTableMeta = pTableDataBlock->pTableMeta; @@ -536,27 +536,20 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int // } // } else { // for (int32_t i = 0; i < numOfRows; ++i) { -// char* payload = (blkKeyTuple + i)->payloadAddr; -// if (isNeedConvertRow(payload)) { -// convertSMemRow(pDataBlock, payload, pTableDataBlock); -// TDRowTLenT rowTLen = memRowTLen(pDataBlock); -// pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); -// pBlock->dataLen += rowTLen; -// } else { -// TDRowTLenT rowTLen = memRowTLen(payload); -// memcpy(pDataBlock, payload, rowTLen); -// pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); -// pBlock->dataLen += rowTLen; -// } +// char* payload = (blkKeyTuple + i)->payloadAddr; +// TDRowLenT rowTLen = memRowTLen(payload); +// memcpy(pDataBlock, payload, rowTLen); +// pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); +// pBlock->dataLen += rowTLen; // } // } -// + // int32_t len = pBlock->dataLen + pBlock->schemaLen; // pBlock->dataLen = htonl(pBlock->dataLen); // pBlock->schemaLen = htonl(pBlock->schemaLen); -// + // return len; -//} +// } TAOS_FIELD createField(const SSchema* pSchema) { TAOS_FIELD f = { .type = pSchema->type, .bytes = pSchema->bytes, }; diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 7dc9c0e28b..7cec88ea1b 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -113,8 +113,9 @@ typedef struct SParsedDataColInfo { int16_t numOfCols; int16_t numOfBound; uint16_t flen; // TODO: get from STSchema - uint16_t allNullLen; // TODO: get from STSchema + uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow) uint16_t extendedVarLen; + uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part) int32_t * boundedColumns; // bound column idx according to schema SBoundColumn * cols; SBoundIdxInfo *colIdxInfo; @@ -130,7 +131,7 @@ typedef struct { typedef struct { uint8_t memRowType; // default is 0, that is SDataRow uint8_t compareStat; // 0 no need, 1 need compare - TDRowTLenT kvRowInitLen; + int32_t rowSize; SMemRowInfo *rowInfo; } SMemRowBuilder; @@ -141,8 +142,7 @@ typedef enum { int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec); -int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, - int32_t allNullLen); +int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo); void destroyMemRowBuilder(SMemRowBuilder *pBuilder); /** @@ -199,9 +199,9 @@ static FORCE_INLINE void tscAppendMemRowColVal(SMemRow row, const void *value, b } // Applicable to consume by one row -static FORCE_INLINE void tscAppendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId, - int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen, - uint8_t compareStat) { +static FORCE_INLINE void tscAppendMemRowColVal(SMemRow row, const void *value, bool isCopyVarData, int16_t colId, + int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen, + uint8_t compareStat) { tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset); if (compareStat == ROW_COMPARE_NEED) { tdGetColAppendDeltaLen(value, colType, dataLen, kvLen); @@ -513,16 +513,6 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) { return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen; } -static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { - if (isDataRow(row)) { - if (kvLen < (dataLen * KVRatioConvert)) { - memRowSetConvert(row); - } - } else if (kvLen > dataLen) { - memRowSetConvert(row); - } -} - static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) { memRowSetType(row, memRowType); if (isDataRowT(memRowType)) { @@ -622,8 +612,7 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE; static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE; static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str, - bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId, - int32_t *dataLen, int32_t *kvLen, uint8_t compareStat) { + bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId) { int64_t iv; int32_t ret; char * endptr = NULL; @@ -635,26 +624,22 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { // bool if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { if (strncmp(pToken->z, "true", pToken->n) == 0) { - tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &TRUE_VALUE, true, colId, pSchema->type, toffset); } else if (strncmp(pToken->z, "false", pToken->n) == 0) { - tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, &FALSE_VALUE, true, colId, pSchema->type, toffset); } else { return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z); } } else if (pToken->type == TK_INTEGER) { iv = strtoll(pToken->z, NULL, 10); - tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset, - dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset); } else if (pToken->type == TK_FLOAT) { double dv = strtod(pToken->z, NULL); - tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset, - dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset); } else { return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z); } @@ -664,8 +649,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok case TSDB_DATA_TYPE_TINYINT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -675,15 +659,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } uint8_t tmpVal = (uint8_t)iv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_UTINYINT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -693,15 +676,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } uint8_t tmpVal = (uint8_t)iv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_SMALLINT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -711,15 +693,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } int16_t tmpVal = (int16_t)iv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_USMALLINT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -729,15 +710,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } uint16_t tmpVal = (uint16_t)iv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_INT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -747,15 +727,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } int32_t tmpVal = (int32_t)iv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_UINT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -765,15 +744,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } uint32_t tmpVal = (uint32_t)iv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_BIGINT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -782,14 +760,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); } - tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &iv, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_UBIGINT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -799,14 +776,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } uint64_t tmpVal = (uint64_t)iv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_FLOAT: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { double dv; if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { @@ -819,14 +795,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok } float tmpVal = (float)dv; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_DOUBLE: if (isNullStr(pToken)) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { double dv; if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { @@ -837,15 +812,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); } - tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &dv, true, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_BINARY: // binary data cannot be null-terminated char string, otherwise the last char of the string is lost if (pToken->type == TK_NULL) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { // too long values will return invalid sql, not be truncated automatically if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor return tscInvalidOperationMsg(msg, "string data overflow", pToken->z); @@ -853,14 +827,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok // STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); char *rowEnd = memRowEnd(row); STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n); - tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset); } break; case TSDB_DATA_TYPE_NCHAR: if (pToken->type == TK_NULL) { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } else { // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' int32_t output = 0; @@ -872,7 +845,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok return tscInvalidOperationMsg(msg, buf, pToken->z); } varDataSetLen(rowEnd, output); - tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset); } break; @@ -881,17 +854,16 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok if (primaryKey) { // When building SKVRow primaryKey, we should not skip even with NULL value. int64_t tmpVal = 0; - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } else { - tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, - compareStat); + tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset); } } else { int64_t tmpVal; if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) { return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); } - tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat); + tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset); } break; diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 1bf27e6cad..6a0201d6a7 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -41,9 +41,8 @@ enum { static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows); static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema, char *str, char **end); -int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, - int32_t allNullLen) { - ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols)); +int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) { + ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols)); if (nRows > 0) { // already init(bind multiple rows by single column) if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) { @@ -51,41 +50,12 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } } - // default compareStat is ROW_COMPARE_NO_NEED - if (nBoundCols == 0) { // file input - pBuilder->memRowType = SMEM_ROW_DATA; - return TSDB_CODE_SUCCESS; + uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen; + uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen; + if (isUtilizeKVRow(kvLen, dataLen)) { + pBuilder->memRowType = SMEM_ROW_KV; } else { - float boundRatio = ((float)nBoundCols / (float)nCols); - - if (boundRatio < KVRatioKV) { - pBuilder->memRowType = SMEM_ROW_KV; - return TSDB_CODE_SUCCESS; - } else if (boundRatio > KVRatioData) { - pBuilder->memRowType = SMEM_ROW_DATA; - return TSDB_CODE_SUCCESS; - } - pBuilder->compareStat = ROW_COMPARE_NEED; - - if (boundRatio < KVRatioPredict) { - pBuilder->memRowType = SMEM_ROW_KV; - } else { - pBuilder->memRowType = SMEM_ROW_DATA; - } - } - - pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx); - - if (nRows > 0) { - pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo)); - if (pBuilder->rowInfo == NULL) { - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - for (int i = 0; i < nRows; ++i) { - (pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; - (pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen; - } + pBuilder->memRowType = SMEM_ROW_DATA; } return TSDB_CODE_SUCCESS; @@ -457,8 +427,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i STableMeta * pTableMeta = pDataBlocks->pTableMeta; SSchema * schema = tscGetTableSchema(pTableMeta); SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder; - int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE; - int32_t kvLen = pBuilder->kvRowInitLen; bool isParseBindParam = false; initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound); @@ -535,8 +503,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i int16_t colId = -1; tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId); - int32_t ret = tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, - colId, &dataLen, &kvLen, pBuilder->compareStat); + int32_t ret = + tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId); if (ret != TSDB_CODE_SUCCESS) { return ret; } @@ -551,13 +519,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i } if (!isParseBindParam) { - // 2. check and set convert flag - if (pBuilder->compareStat == ROW_COMPARE_NEED) { - checkAndConvertMemRow(row, dataLen, kvLen); - } - - // 3. set the null value for the columns that do not assign values - if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) { + // set the null value for the columns that do not assign values + if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) { SDataRow dataRow = memRowDataBody(row); for (int32_t i = 0; i < spd->numOfCols; ++i) { if (spd->cols[i].valStat == VAL_STAT_NONE) { @@ -567,7 +530,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i } } - *len = getExtendedRowSize(pDataBlocks); + *len = pBuilder->rowSize; return TSDB_CODE_SUCCESS; } @@ -620,11 +583,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn int32_t extendedRowSize = getExtendedRowSize(pDataBlock); - if (TSDB_CODE_SUCCESS != - (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, - pDataBlock->boundColumnInfo.allNullLen))) { + if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) { return code; } + pDataBlock->rowBuilder.rowSize = extendedRowSize; while (1) { index = 0; sToken = tStrGetToken(*str, &index, false); @@ -703,6 +665,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32 pColInfo->boundedColumns[i] = i; } pColInfo->allNullLen += pColInfo->flen; + pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT)); } @@ -1200,6 +1163,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat int32_t nCols = pColInfo->numOfCols; pColInfo->numOfBound = 0; + pColInfo->boundNullLen = 0; memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols); for (int32_t i = 0; i < nCols; ++i) { pColInfo->cols[i].valStat = VAL_STAT_NONE; @@ -1249,6 +1213,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->boundedColumns[pColInfo->numOfBound] = t; ++pColInfo->numOfBound; + switch (pSchema[t].type) { + case TSDB_DATA_TYPE_BINARY: + pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES); + break; + case TSDB_DATA_TYPE_NCHAR: + pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); + break; + default: + pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type]; + break; + } findColumnIndex = true; if (isOrdered && (lastColIdx > t)) { isOrdered = false; @@ -1272,6 +1247,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat pColInfo->cols[t].valStat = VAL_STAT_HAS; pColInfo->boundedColumns[pColInfo->numOfBound] = t; ++pColInfo->numOfBound; + switch (pSchema[t].type) { + case TSDB_DATA_TYPE_BINARY: + pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES); + break; + case TSDB_DATA_TYPE_NCHAR: + pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE); + break; + default: + pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type]; + break; + } findColumnIndex = true; if (isOrdered && (lastColIdx > t)) { isOrdered = false; @@ -1715,12 +1701,17 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow goto _error; } - tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows); + int32_t extendedRowSize = getExtendedRowSize(pTableDataBlock); + tscAllocateMemIfNeed(pTableDataBlock, extendedRowSize, &maxRows); tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW); if (tokenBuf == NULL) { code = TSDB_CODE_TSC_OUT_OF_MEMORY; goto _error; } + + // insert from .csv means full and ordered columns, thus use SDataRow all the time + ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType); + pTableDataBlock->rowBuilder.rowSize = extendedRowSize; while ((readLen = tgetline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index b7d88f94ca..3a8c740ea1 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1881,18 +1881,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI } } else { for (int32_t i = 0; i < numOfRows; ++i) { - char* payload = (blkKeyTuple + i)->payloadAddr; - if (isNeedConvertRow(payload)) { - convertSMemRow(pDataBlock, payload, pTableDataBlock); - TDRowTLenT rowTLen = memRowTLen(pDataBlock); - pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); - pBlock->dataLen += rowTLen; - } else { - TDRowTLenT rowTLen = memRowTLen(payload); - memcpy(pDataBlock, payload, rowTLen); - pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); - pBlock->dataLen += rowTLen; - } + char* payload = (blkKeyTuple + i)->payloadAddr; + TDRowLenT rowTLen = memRowTLen(payload); + memcpy(pDataBlock, payload, rowTLen); + pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen); + pBlock->dataLen += rowTLen; } }