porting 2.0 of TS-854
This commit is contained in:
parent
17a26c4703
commit
3f7813db7b
|
@ -328,6 +328,7 @@ typedef struct SDataCol {
|
|||
int len; // column data length
|
||||
VarDataOffsetT *dataOff; // For binary and nchar data, the offset in the data column
|
||||
void * pData; // Actual data pointer
|
||||
void * pBitmap; // Bitmap pointer to support None value
|
||||
TSKEY ts; // only used in last NULL column
|
||||
} SDataCol;
|
||||
|
||||
|
@ -572,7 +573,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
|
|||
return 0;
|
||||
}
|
||||
|
||||
// ----------------- SMemRow appended with sequential data row structure
|
||||
// ----------------- SRow appended with tuple row structure
|
||||
/*
|
||||
* |---------|------------------------------------------------- len ---------------------------------->|
|
||||
* |<-------- Head ------>|<--------- flen -------------->| |
|
||||
|
@ -585,7 +586,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
|
|||
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
|
||||
*/
|
||||
|
||||
// ----------------- SMemRow appended with extended K-V data row structure
|
||||
// ----------------- SRow appended with extended K-V data row structure
|
||||
/* |--------------------|------------------------------------------------ len ---------------------------------->|
|
||||
* |<------------- Head ------------>|<--------- flen -------------->| |
|
||||
* |--------------------+----------+--------------------------------------------+---------------------------------+
|
||||
|
@ -605,22 +606,17 @@ typedef void *SMemRow;
|
|||
|
||||
#define SMEM_ROW_DATA 0x0U // SDataRow
|
||||
#define SMEM_ROW_KV 0x01U // SKVRow
|
||||
#define SMEM_ROW_CONVERT 0x80U // SMemRow convert flag
|
||||
|
||||
#define KVRatioKV (0.2f) // all bool
|
||||
#define KVRatioPredict (0.4f)
|
||||
#define KVRatioData (0.75f) // all bigint
|
||||
#define KVRatioConvert (0.9f)
|
||||
|
||||
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
|
||||
|
||||
#define memRowSetType(r, t) ((*(uint8_t *)(r)) = (t)) // set the total byte in case of dirty memory
|
||||
#define memRowSetConvert(r) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0x7F) | SMEM_ROW_CONVERT)) // highest bit
|
||||
#define isDataRowT(t) (SMEM_ROW_DATA == (((uint8_t)(t)) & 0x01))
|
||||
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
|
||||
#define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01))
|
||||
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
|
||||
#define isNeedConvertRow(r) (((*(uint8_t *)(r)) & 0x80) == SMEM_ROW_CONVERT)
|
||||
#define isUtilizeKVRow(k, d) ((k) < ((d)*KVRatioConvert))
|
||||
|
||||
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
|
||||
#define memRowKvBody(r) \
|
||||
|
@ -630,9 +626,9 @@ typedef void *SMemRow;
|
|||
#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r)) // 0~65535
|
||||
|
||||
#define memRowDataTLen(r) \
|
||||
((TDRowTLenT)(memRowDataLen(r) + TD_MEM_ROW_TYPE_SIZE)) // using uint32_t/int32_t to store the TLen
|
||||
((TDRowLenT)(memRowDataLen(r) + TD_MEM_ROW_TYPE_SIZE)) // using uint32_t/int32_t to store the TLen
|
||||
|
||||
#define memRowKvTLen(r) ((TDRowTLenT)(memRowKvLen(r) + TD_MEM_ROW_KV_TYPE_VER_SIZE))
|
||||
#define memRowKvTLen(r) ((TDRowLenT)(memRowKvLen(r) + TD_MEM_ROW_KV_TYPE_VER_SIZE))
|
||||
|
||||
#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r))
|
||||
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen
|
||||
|
@ -774,7 +770,7 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
|
|||
*/
|
||||
|
||||
#define PAYLOAD_NCOLS_LEN sizeof(uint16_t)
|
||||
#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowTLenT))
|
||||
#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT))
|
||||
#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN)
|
||||
#define PAYLOAD_ID_LEN sizeof(int16_t)
|
||||
#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t))
|
||||
|
@ -784,7 +780,7 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
|
|||
#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN)
|
||||
#define payloadType(r) (*(uint8_t *)(r))
|
||||
#define payloadSetType(r, t) (payloadType(r) = (t))
|
||||
#define payloadTLen(r) (*(TDRowTLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header
|
||||
#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header
|
||||
#define payloadSetTLen(r, l) (payloadTLen(r) = (l))
|
||||
#define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET))
|
||||
#define payloadSetNCols(r, n) (payloadNCols(r) = (n))
|
||||
|
|
|
@ -26,13 +26,102 @@
|
|||
extern "C" {
|
||||
#endif
|
||||
|
||||
#define TD_UNDECIDED_ROW 0
|
||||
#define TD_OR_ROW 1
|
||||
#define TD_KV_ROW 2
|
||||
#define TD_SUPPORT_NONE_VAL
|
||||
|
||||
// row type
|
||||
#define TD_ROW_TP 0 // default
|
||||
#define TD_ROW_KV 1
|
||||
|
||||
// val type
|
||||
#define TD_VTYPE_NORM 0x0 // normal val: not none, not null
|
||||
#define TD_VTYPE_NONE 0x01 // none or unknown/undefined
|
||||
#define TD_VTYPE_NULL 0x02 // null val
|
||||
|
||||
#ifdef TD_SUPPORT_NONE_VAL
|
||||
static FORCE_INLINE bool tdValTypeIsNorm(int8_t valType) { return (valType & TD_VTYPE_NORM); }
|
||||
static FORCE_INLINE bool tdValTypeIsNone(int8_t valType) { return (valType & TD_VTYPE_NONE); }
|
||||
static FORCE_INLINE bool tdValTypeIsNull(int8_t valType) { return (valType & TD_VTYPE_NULL); }
|
||||
#else
|
||||
|
||||
#endif
|
||||
|
||||
static FORCE_INLINE bool tdValIsNorm(int8_t valType, const void *val, int32_t type) {
|
||||
#ifdef TD_SUPPORT_NONE_VAL
|
||||
return tdValTypeIsNorm(valType);
|
||||
#else
|
||||
return !isNull(val, type);
|
||||
#endif
|
||||
}
|
||||
static FORCE_INLINE bool tdValIsNone(int8_t valType) {
|
||||
#ifdef TD_SUPPORT_NONE_VAL
|
||||
return tdValTypeIsNone(valType);
|
||||
#else
|
||||
return false;
|
||||
#endif
|
||||
}
|
||||
static FORCE_INLINE bool tdValIsNull(int8_t valType, const void *val, int32_t type) {
|
||||
#ifdef TD_SUPPORT_NONE_VAL
|
||||
return tdValTypeIsNull(valType);
|
||||
#else
|
||||
return isNull(val, type);
|
||||
#endif
|
||||
}
|
||||
|
||||
#define TD_ROW_LEN(r)
|
||||
#define TD_ROW_TLEN(r)
|
||||
#define TD_ROW_TYPE(r)
|
||||
#define TD_ROW_BODY(r)
|
||||
#define TD_ROW_TKEY(r)
|
||||
#define TD_ROW_KEY(r)
|
||||
#define TD_ROW_DELETED(r)
|
||||
#define TD_ROW_VERSION(r)
|
||||
#define TD_ROW_MAX_BYTES_FROM_SCHEMA(s)
|
||||
|
||||
#define TD_ROW_SET_LEN(r, l)
|
||||
#define TD_ROW_SET_VERSION(r, v)
|
||||
#define TD_ROW_CPY(dst, r)
|
||||
|
||||
// ----------------- SRow appended with tuple row structure(STpRow)
|
||||
/*
|
||||
* |---------|------------------------------------------------- len -------------------------------------->|
|
||||
* |<-------- Head ------>|<--------- flen ------------->|<-- blen -->| |
|
||||
* |---------+---------------------+---------------------------------+-------------+-----------------------+
|
||||
* | uint8_t | uint32_t | int16_t | | | |
|
||||
* |---------+----------+----------+---------------------------------+-------------------------------------+
|
||||
* | flag | len | sversion |(key) first part | bitmap | second part |
|
||||
* +---------+----------+----------+---------------------------------+-------------------------------------+
|
||||
*
|
||||
* NOTE: Timestamp in this row structure is TKEY instead of TSKEY
|
||||
* Use 2 bits in bitmap for each column
|
||||
* flag:
|
||||
* 0: flag&0x01 0 STpRow, 1 SKvRow // since 2.0
|
||||
* 1: flag&0x02 0 without bitmap, 1 with bitmap. // 如果None值支持数据库或者更小维度,则需要指定一个bit区分。TODO
|
||||
* 2: endian(0 big endian, 1 little endian)
|
||||
* 3-7: reserved(value 0)
|
||||
*/
|
||||
|
||||
// ----------------- SRow appended with K-V row structure(SKvRow)
|
||||
/* |--------------------|------------------------------------------------ len --------------------------->|
|
||||
* |<-------- Head ---->|<--------- colsIdxLen ---------->|<-- blen -->| |
|
||||
* |--------------------+----------+-----------------------------------------------------------------------+
|
||||
* | uint8_t | uint32_t | int16_t | | | |
|
||||
* |---------+----------+----------+-----------------------------------------------------------------------+
|
||||
* | flag | len | ncols |(keyColId) cols index | bitmap | data part |
|
||||
* |---------+----------+----------+---------------------------------+-------------------------------------+
|
||||
*
|
||||
* NOTE: Timestamp in this row structure is TKEY instead of TSKEY
|
||||
*/
|
||||
|
||||
typedef void *SRow;
|
||||
|
||||
typedef struct {
|
||||
int8_t valType;
|
||||
void * val;
|
||||
} SCellVal;
|
||||
|
||||
typedef struct {
|
||||
// TODO
|
||||
} SOrRow;
|
||||
} STpRow; // tuple
|
||||
|
||||
typedef struct {
|
||||
col_id_t cid;
|
||||
|
|
|
@ -10,8 +10,7 @@ extern "C" {
|
|||
|
||||
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
|
||||
typedef int32_t VarDataOffsetT;
|
||||
typedef uint16_t TDRowLenT; // not including overhead: 0 ~ 65535
|
||||
typedef uint32_t TDRowTLenT; // total length, including overhead
|
||||
typedef uint32_t TDRowLenT;
|
||||
|
||||
typedef struct tstr {
|
||||
VarDataLenT len;
|
||||
|
|
|
@ -79,7 +79,6 @@ typedef int16_t VarDataLenT; // maxVarDataLen: 32767
|
|||
#define varDataVal(v) ((void *)((char *)v + VARSTR_HEADER_SIZE))
|
||||
|
||||
typedef int32_t VarDataOffsetT;
|
||||
typedef int16_t VarDataLenT; // maxVarDataLen: 32767
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -792,7 +792,7 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
|
|||
dataRowSetVersion(dataRow, schemaVersion(pSchema1)); // use latest schema version
|
||||
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pSchema1->flen));
|
||||
|
||||
TDRowTLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
|
||||
TDRowLenT dataLen = 0, kvLen = TD_MEM_ROW_KV_HEAD_SIZE;
|
||||
|
||||
int32_t i = 0; // row1
|
||||
int32_t j = 0; // row2
|
||||
|
|
|
@ -55,11 +55,12 @@ typedef struct SParsedDataColInfo {
|
|||
int16_t numOfCols;
|
||||
int16_t numOfBound;
|
||||
uint16_t flen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
|
||||
uint16_t extendedVarLen;
|
||||
int32_t *boundedColumns; // bound column idx according to schema
|
||||
SBoundColumn *cols;
|
||||
SBoundIdxInfo *colIdxInfo;
|
||||
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
|
||||
int32_t * boundedColumns; // bound column idx according to schema
|
||||
SBoundColumn * cols;
|
||||
SBoundIdxInfo *colIdxInfo;
|
||||
int8_t orderStatus; // bound columns
|
||||
} SParsedDataColInfo;
|
||||
|
||||
|
@ -71,7 +72,7 @@ typedef struct SMemRowInfo {
|
|||
typedef struct {
|
||||
uint8_t memRowType; // default is 0, that is SDataRow
|
||||
uint8_t compareStat; // 0 no need, 1 need compare
|
||||
TDRowTLenT kvRowInitLen;
|
||||
int32_t rowSize;
|
||||
SMemRowInfo *rowInfo;
|
||||
} SMemRowBuilder;
|
||||
|
||||
|
@ -143,16 +144,6 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void convertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
|
||||
if (isDataRow(row)) {
|
||||
if (kvLen < (dataLen * KVRatioConvert)) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
} else if (kvLen > dataLen) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t setBlockInfo(SSubmitBlk *pBlocks, const STableMeta *pTableMeta, int32_t numOfRows) {
|
||||
pBlocks->tid = pTableMeta->suid;
|
||||
pBlocks->uid = pTableMeta->uid;
|
||||
|
@ -171,7 +162,7 @@ int32_t boundIdxCompar(const void *lhs, const void *rhs);
|
|||
void setBoundColumnInfo(SParsedDataColInfo* pColList, SSchema* pSchema, int32_t numOfCols);
|
||||
void destroyBoundColumnInfo(SParsedDataColInfo* pColList);
|
||||
void destroyBlockArrayList(SArray* pDataBlockList);
|
||||
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen);
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo);
|
||||
int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows);
|
||||
int32_t getDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
const STableMeta* pTableMeta, STableDataBlocks** dataBlocks, SArray* pBlockList);
|
||||
|
|
|
@ -448,18 +448,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
|
|||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
if (isNeedConvertRow(payload)) {
|
||||
convertSMemRow(pDataBlock, payload, pTableDataBlock);
|
||||
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
} else {
|
||||
TDRowTLenT rowTLen = memRowTLen(payload);
|
||||
memcpy(pDataBlock, payload, rowTLen);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
}
|
||||
char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
TDRowLenT rowTLen = memRowTLen(payload);
|
||||
memcpy(pDataBlock, payload, rowTLen);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -577,8 +570,8 @@ int32_t allocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols, int32_t allNullLen) {
|
||||
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) {
|
||||
ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
|
||||
if (nRows > 0) {
|
||||
// already init(bind multiple rows by single column)
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
|
||||
|
@ -586,41 +579,12 @@ int32_t initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCo
|
|||
}
|
||||
}
|
||||
|
||||
// default compareStat is ROW_COMPARE_NO_NEED
|
||||
if (nBoundCols == 0) { // file input
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen;
|
||||
uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen;
|
||||
if (isUtilizeKVRow(kvLen, dataLen)) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
} else {
|
||||
float boundRatio = ((float)nBoundCols / (float)nCols);
|
||||
|
||||
if (boundRatio < KVRatioKV) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (boundRatio > KVRatioData) {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pBuilder->compareStat = ROW_COMPARE_NEED;
|
||||
|
||||
if (boundRatio < KVRatioPredict) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
} else {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
}
|
||||
}
|
||||
|
||||
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
|
||||
|
||||
if (nRows > 0) {
|
||||
pBuilder->rowInfo = calloc(nRows, sizeof(SMemRowInfo));
|
||||
if (pBuilder->rowInfo == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nRows; ++i) {
|
||||
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
|
||||
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
|
||||
}
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
|
|
@ -440,13 +440,8 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
|
|||
}
|
||||
|
||||
if (!isParseBindParam) {
|
||||
// 2. check and set convert flag
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
|
||||
convertMemRow(row, spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE, pBuilder->kvRowInitLen);
|
||||
}
|
||||
|
||||
// 3. set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) {
|
||||
// set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) {
|
||||
SDataRow dataRow = memRowDataBody(row);
|
||||
for (int32_t i = 0; i < spd->numOfCols; ++i) {
|
||||
if (spd->cols[i].valStat == VAL_STAT_NONE) {
|
||||
|
@ -456,7 +451,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
|
|||
}
|
||||
}
|
||||
|
||||
*len = getExtendedRowSize(pDataBlocks);
|
||||
*len = pBuilder->rowSize;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -464,7 +459,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
|
|||
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
|
||||
STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
|
||||
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
|
||||
CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, pDataBlock->boundColumnInfo.allNullLen));
|
||||
CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo));
|
||||
|
||||
(*numOfRows) = 0;
|
||||
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
|
||||
|
|
|
@ -402,7 +402,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int
|
|||
* @param dataBlocks
|
||||
* @return
|
||||
*/
|
||||
//int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name,
|
||||
// int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOffset, SName* name,
|
||||
// STableMeta* pTableMeta, STableDataBlocks** dataBlocks) {
|
||||
// STableDataBlocks* dataBuf = (STableDataBlocks*)calloc(1, sizeof(STableDataBlocks));
|
||||
// if (dataBuf == NULL) {
|
||||
|
@ -449,7 +449,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int
|
|||
// return TSDB_CODE_SUCCESS;
|
||||
//}
|
||||
//
|
||||
//int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
// int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, int32_t startOffset, int32_t rowSize,
|
||||
// SName* name, STableMeta* pTableMeta, STableDataBlocks** dataBlocks,
|
||||
// SArray* pBlockList) {
|
||||
// *dataBlocks = NULL;
|
||||
|
@ -474,7 +474,7 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int
|
|||
//}
|
||||
//
|
||||
//// Erase the empty space reserved for binary data
|
||||
//static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam,
|
||||
// static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam,
|
||||
// SBlockKeyTuple* blkKeyTuple) {
|
||||
// // TODO: optimize this function, handle the case while binary is not presented
|
||||
// STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
|
||||
|
@ -536,27 +536,20 @@ static void createInputDataFilterInfo(SQueryStmtInfo* px, int32_t numOfCol1, int
|
|||
// }
|
||||
// } else {
|
||||
// for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
// char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
// if (isNeedConvertRow(payload)) {
|
||||
// convertSMemRow(pDataBlock, payload, pTableDataBlock);
|
||||
// TDRowTLenT rowTLen = memRowTLen(pDataBlock);
|
||||
// pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
// pBlock->dataLen += rowTLen;
|
||||
// } else {
|
||||
// TDRowTLenT rowTLen = memRowTLen(payload);
|
||||
// memcpy(pDataBlock, payload, rowTLen);
|
||||
// pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
// pBlock->dataLen += rowTLen;
|
||||
// }
|
||||
// char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
// TDRowLenT rowTLen = memRowTLen(payload);
|
||||
// memcpy(pDataBlock, payload, rowTLen);
|
||||
// pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
// pBlock->dataLen += rowTLen;
|
||||
// }
|
||||
// }
|
||||
//
|
||||
|
||||
// int32_t len = pBlock->dataLen + pBlock->schemaLen;
|
||||
// pBlock->dataLen = htonl(pBlock->dataLen);
|
||||
// pBlock->schemaLen = htonl(pBlock->schemaLen);
|
||||
//
|
||||
|
||||
// return len;
|
||||
//}
|
||||
// }
|
||||
|
||||
TAOS_FIELD createField(const SSchema* pSchema) {
|
||||
TAOS_FIELD f = { .type = pSchema->type, .bytes = pSchema->bytes, };
|
||||
|
|
|
@ -113,8 +113,9 @@ typedef struct SParsedDataColInfo {
|
|||
int16_t numOfCols;
|
||||
int16_t numOfBound;
|
||||
uint16_t flen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema(base on SDataRow)
|
||||
uint16_t extendedVarLen;
|
||||
uint16_t boundNullLen; // bound column len with all NULL value(without VarDataOffsetT/SColIdx part)
|
||||
int32_t * boundedColumns; // bound column idx according to schema
|
||||
SBoundColumn * cols;
|
||||
SBoundIdxInfo *colIdxInfo;
|
||||
|
@ -130,7 +131,7 @@ typedef struct {
|
|||
typedef struct {
|
||||
uint8_t memRowType; // default is 0, that is SDataRow
|
||||
uint8_t compareStat; // 0 no need, 1 need compare
|
||||
TDRowTLenT kvRowInitLen;
|
||||
int32_t rowSize;
|
||||
SMemRowInfo *rowInfo;
|
||||
} SMemRowBuilder;
|
||||
|
||||
|
@ -141,8 +142,7 @@ typedef enum {
|
|||
|
||||
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec);
|
||||
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
|
||||
int32_t allNullLen);
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo);
|
||||
void destroyMemRowBuilder(SMemRowBuilder *pBuilder);
|
||||
|
||||
/**
|
||||
|
@ -199,9 +199,9 @@ static FORCE_INLINE void tscAppendMemRowColVal(SMemRow row, const void *value, b
|
|||
}
|
||||
|
||||
// Applicable to consume by one row
|
||||
static FORCE_INLINE void tscAppendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
|
||||
uint8_t compareStat) {
|
||||
static FORCE_INLINE void tscAppendMemRowColVal(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
|
||||
uint8_t compareStat) {
|
||||
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
|
||||
if (compareStat == ROW_COMPARE_NEED) {
|
||||
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
|
||||
|
@ -513,16 +513,6 @@ static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
|
|||
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
|
||||
if (isDataRow(row)) {
|
||||
if (kvLen < (dataLen * KVRatioConvert)) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
} else if (kvLen > dataLen) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
|
||||
memRowSetType(row, memRowType);
|
||||
if (isDataRowT(memRowType)) {
|
||||
|
@ -622,8 +612,7 @@ static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
|
|||
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
|
||||
|
||||
static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str,
|
||||
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId,
|
||||
int32_t *dataLen, int32_t *kvLen, uint8_t compareStat) {
|
||||
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId) {
|
||||
int64_t iv;
|
||||
int32_t ret;
|
||||
char * endptr = NULL;
|
||||
|
@ -635,26 +624,22 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
switch (pSchema->type) {
|
||||
case TSDB_DATA_TYPE_BOOL: { // bool
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
|
||||
if (strncmp(pToken->z, "true", pToken->n) == 0) {
|
||||
tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &TRUE_VALUE, true, colId, pSchema->type, toffset);
|
||||
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
|
||||
tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, &FALSE_VALUE, true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
} else if (pToken->type == TK_INTEGER) {
|
||||
iv = strtoll(pToken->z, NULL, 10);
|
||||
tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
|
||||
dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
|
||||
} else if (pToken->type == TK_FLOAT) {
|
||||
double dv = strtod(pToken->z, NULL);
|
||||
tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
|
||||
dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
|
@ -664,8 +649,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -675,15 +659,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -693,15 +676,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -711,15 +693,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
int16_t tmpVal = (int16_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -729,15 +710,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint16_t tmpVal = (uint16_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -747,15 +727,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
int32_t tmpVal = (int32_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -765,15 +744,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint32_t tmpVal = (uint32_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -782,14 +760,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &iv, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
|
@ -799,14 +776,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
uint64_t tmpVal = (uint64_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
|
@ -819,14 +795,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
}
|
||||
|
||||
float tmpVal = (float)dv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
|
@ -837,15 +812,14 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &dv, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
|
||||
if (pToken->type == TK_NULL) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else { // too long values will return invalid sql, not be truncated automatically
|
||||
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
|
||||
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
|
||||
|
@ -853,14 +827,13 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
|
||||
char *rowEnd = memRowEnd(row);
|
||||
STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n);
|
||||
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (pToken->type == TK_NULL) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
int32_t output = 0;
|
||||
|
@ -872,7 +845,7 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
return tscInvalidOperationMsg(msg, buf, pToken->z);
|
||||
}
|
||||
varDataSetLen(rowEnd, output);
|
||||
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, rowEnd, false, colId, pSchema->type, toffset);
|
||||
}
|
||||
break;
|
||||
|
||||
|
@ -881,17 +854,16 @@ static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pTok
|
|||
if (primaryKey) {
|
||||
// When building SKVRow primaryKey, we should not skip even with NULL value.
|
||||
int64_t tmpVal = 0;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
} else {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
tscAppendMemRowColVal(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset);
|
||||
}
|
||||
} else {
|
||||
int64_t tmpVal;
|
||||
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
|
||||
}
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
tscAppendMemRowColVal(row, &tmpVal, true, colId, pSchema->type, toffset);
|
||||
}
|
||||
|
||||
break;
|
||||
|
|
|
@ -41,9 +41,8 @@ enum {
|
|||
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows);
|
||||
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
|
||||
char *str, char **end);
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
|
||||
int32_t allNullLen) {
|
||||
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, SParsedDataColInfo *pColInfo) {
|
||||
ASSERT(nRows >= 0 && pColInfo->numOfCols > 0 && (pColInfo->numOfBound <= pColInfo->numOfCols));
|
||||
if (nRows > 0) {
|
||||
// already init(bind multiple rows by single column)
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
|
||||
|
@ -51,41 +50,12 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3
|
|||
}
|
||||
}
|
||||
|
||||
// default compareStat is ROW_COMPARE_NO_NEED
|
||||
if (nBoundCols == 0) { // file input
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
uint32_t dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + pColInfo->allNullLen;
|
||||
uint32_t kvLen = TD_MEM_ROW_KV_HEAD_SIZE + pColInfo->numOfBound * sizeof(SColIdx) + pColInfo->boundNullLen;
|
||||
if (isUtilizeKVRow(kvLen, dataLen)) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
} else {
|
||||
float boundRatio = ((float)nBoundCols / (float)nCols);
|
||||
|
||||
if (boundRatio < KVRatioKV) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (boundRatio > KVRatioData) {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pBuilder->compareStat = ROW_COMPARE_NEED;
|
||||
|
||||
if (boundRatio < KVRatioPredict) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
} else {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
}
|
||||
}
|
||||
|
||||
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
|
||||
|
||||
if (nRows > 0) {
|
||||
pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo));
|
||||
if (pBuilder->rowInfo == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nRows; ++i) {
|
||||
(pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
|
||||
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
|
||||
}
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
|
@ -457,8 +427,6 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
|
||||
SSchema * schema = tscGetTableSchema(pTableMeta);
|
||||
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
|
||||
int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
|
||||
int32_t kvLen = pBuilder->kvRowInitLen;
|
||||
bool isParseBindParam = false;
|
||||
|
||||
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
|
||||
|
@ -535,8 +503,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
int16_t colId = -1;
|
||||
tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
|
||||
|
||||
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset,
|
||||
colId, &dataLen, &kvLen, pBuilder->compareStat);
|
||||
int32_t ret =
|
||||
tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset, colId);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
@ -551,13 +519,8 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
}
|
||||
|
||||
if (!isParseBindParam) {
|
||||
// 2. check and set convert flag
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
|
||||
checkAndConvertMemRow(row, dataLen, kvLen);
|
||||
}
|
||||
|
||||
// 3. set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) {
|
||||
// set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row)) {
|
||||
SDataRow dataRow = memRowDataBody(row);
|
||||
for (int32_t i = 0; i < spd->numOfCols; ++i) {
|
||||
if (spd->cols[i].valStat == VAL_STAT_NONE) {
|
||||
|
@ -567,7 +530,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
}
|
||||
}
|
||||
|
||||
*len = getExtendedRowSize(pDataBlocks);
|
||||
*len = pBuilder->rowSize;
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
@ -620,11 +583,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
|||
|
||||
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
|
||||
|
||||
if (TSDB_CODE_SUCCESS !=
|
||||
(code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound,
|
||||
pDataBlock->boundColumnInfo.allNullLen))) {
|
||||
if (TSDB_CODE_SUCCESS != (code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, &pDataBlock->boundColumnInfo))) {
|
||||
return code;
|
||||
}
|
||||
pDataBlock->rowBuilder.rowSize = extendedRowSize;
|
||||
while (1) {
|
||||
index = 0;
|
||||
sToken = tStrGetToken(*str, &index, false);
|
||||
|
@ -703,6 +665,7 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32
|
|||
pColInfo->boundedColumns[i] = i;
|
||||
}
|
||||
pColInfo->allNullLen += pColInfo->flen;
|
||||
pColInfo->boundNullLen = pColInfo->allNullLen; // default set allNullLen
|
||||
pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
|
||||
}
|
||||
|
||||
|
@ -1200,6 +1163,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
int32_t nCols = pColInfo->numOfCols;
|
||||
|
||||
pColInfo->numOfBound = 0;
|
||||
pColInfo->boundNullLen = 0;
|
||||
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols);
|
||||
for (int32_t i = 0; i < nCols; ++i) {
|
||||
pColInfo->cols[i].valStat = VAL_STAT_NONE;
|
||||
|
@ -1249,6 +1213,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
pColInfo->cols[t].valStat = VAL_STAT_HAS;
|
||||
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
|
||||
++pColInfo->numOfBound;
|
||||
switch (pSchema[t].type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
break;
|
||||
default:
|
||||
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||
break;
|
||||
}
|
||||
findColumnIndex = true;
|
||||
if (isOrdered && (lastColIdx > t)) {
|
||||
isOrdered = false;
|
||||
|
@ -1272,6 +1247,17 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
pColInfo->cols[t].valStat = VAL_STAT_HAS;
|
||||
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
|
||||
++pColInfo->numOfBound;
|
||||
switch (pSchema[t].type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pColInfo->boundNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
break;
|
||||
default:
|
||||
pColInfo->boundNullLen += TYPE_BYTES[pSchema[t].type];
|
||||
break;
|
||||
}
|
||||
findColumnIndex = true;
|
||||
if (isOrdered && (lastColIdx > t)) {
|
||||
isOrdered = false;
|
||||
|
@ -1715,12 +1701,17 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
|
|||
goto _error;
|
||||
}
|
||||
|
||||
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows);
|
||||
int32_t extendedRowSize = getExtendedRowSize(pTableDataBlock);
|
||||
tscAllocateMemIfNeed(pTableDataBlock, extendedRowSize, &maxRows);
|
||||
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
|
||||
if (tokenBuf == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
// insert from .csv means full and ordered columns, thus use SDataRow all the time
|
||||
ASSERT(SMEM_ROW_DATA == pTableDataBlock->rowBuilder.memRowType);
|
||||
pTableDataBlock->rowBuilder.rowSize = extendedRowSize;
|
||||
|
||||
while ((readLen = tgetline(&line, &n, fp)) != -1) {
|
||||
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
|
||||
|
|
|
@ -1881,18 +1881,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
|
|||
}
|
||||
} else {
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
if (isNeedConvertRow(payload)) {
|
||||
convertSMemRow(pDataBlock, payload, pTableDataBlock);
|
||||
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
} else {
|
||||
TDRowTLenT rowTLen = memRowTLen(payload);
|
||||
memcpy(pDataBlock, payload, rowTLen);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
}
|
||||
char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
TDRowLenT rowTLen = memRowTLen(payload);
|
||||
memcpy(pDataBlock, payload, rowTLen);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue