Merge pull request #7132 from taosdata/enhance/TD-5622
<TD-5622><enhance>:generate SMemRow from source
This commit is contained in:
commit
db2b89741b
|
@ -84,9 +84,14 @@ typedef struct SParamInfo {
|
|||
} SParamInfo;
|
||||
|
||||
typedef struct SBoundColumn {
|
||||
bool hasVal; // denote if current column has bound or not
|
||||
int32_t offset; // all column offset value
|
||||
int32_t offset; // all column offset value
|
||||
int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future
|
||||
uint8_t valStat; // denote if current column bound or not(0 means has val, 1 means no val)
|
||||
} SBoundColumn;
|
||||
typedef enum {
|
||||
VAL_STAT_HAS = 0x0, // 0 means has val
|
||||
VAL_STAT_NONE = 0x01, // 1 means no val
|
||||
} EValStat;
|
||||
|
||||
typedef struct {
|
||||
uint16_t schemaColIdx;
|
||||
|
@ -99,32 +104,106 @@ typedef enum _COL_ORDER_STATUS {
|
|||
ORDER_STATUS_ORDERED = 1,
|
||||
ORDER_STATUS_DISORDERED = 2,
|
||||
} EOrderStatus;
|
||||
|
||||
typedef struct SParsedDataColInfo {
|
||||
int16_t numOfCols;
|
||||
int16_t numOfBound;
|
||||
int32_t * boundedColumns; // bounded column idx according to schema
|
||||
uint16_t flen; // TODO: get from STSchema
|
||||
uint16_t allNullLen; // TODO: get from STSchema
|
||||
uint16_t extendedVarLen;
|
||||
int32_t * boundedColumns; // bound column idx according to schema
|
||||
SBoundColumn * cols;
|
||||
SBoundIdxInfo *colIdxInfo;
|
||||
int8_t orderStatus; // bounded columns:
|
||||
int8_t orderStatus; // bound columns
|
||||
} SParsedDataColInfo;
|
||||
|
||||
#define IS_DATA_COL_ORDERED(s) ((s) == (int8_t)ORDER_STATUS_ORDERED)
|
||||
#define IS_DATA_COL_ORDERED(spd) ((spd->orderStatus) == (int8_t)ORDER_STATUS_ORDERED)
|
||||
|
||||
typedef struct {
|
||||
SSchema * pSchema;
|
||||
int16_t sversion;
|
||||
int32_t flen;
|
||||
uint16_t nCols;
|
||||
void * buf;
|
||||
void * pDataBlock;
|
||||
SSubmitBlk *pSubmitBlk;
|
||||
int32_t dataLen; // len of SDataRow
|
||||
int32_t kvLen; // len of SKVRow
|
||||
} SMemRowInfo;
|
||||
typedef struct {
|
||||
uint8_t memRowType;
|
||||
uint8_t compareStat; // 0 unknown, 1 need compare, 2 no need
|
||||
TDRowTLenT dataRowInitLen;
|
||||
TDRowTLenT kvRowInitLen;
|
||||
SMemRowInfo *rowInfo;
|
||||
} SMemRowBuilder;
|
||||
|
||||
typedef struct {
|
||||
TDRowLenT allNullLen;
|
||||
} SMemRowHelper;
|
||||
typedef enum {
|
||||
ROW_COMPARE_UNKNOWN = 0,
|
||||
ROW_COMPARE_NEED = 1,
|
||||
ROW_COMPARE_NO_NEED = 2,
|
||||
} ERowCompareStat;
|
||||
|
||||
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec);
|
||||
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
|
||||
int32_t allNullLen);
|
||||
void destroyMemRowBuilder(SMemRowBuilder *pBuilder);
|
||||
|
||||
/**
|
||||
* @brief
|
||||
*
|
||||
* @param memRowType
|
||||
* @param spd
|
||||
* @param idx the absolute bound index of columns
|
||||
* @return FORCE_INLINE
|
||||
*/
|
||||
static FORCE_INLINE void tscGetMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd,
|
||||
int32_t idx, int32_t *toffset, int16_t *colId) {
|
||||
int32_t schemaIdx = 0;
|
||||
if (IS_DATA_COL_ORDERED(spd)) {
|
||||
schemaIdx = spd->boundedColumns[idx];
|
||||
if (isDataRowT(memRowType)) {
|
||||
*toffset = (spd->cols + schemaIdx)->toffset; // the offset of firstPart
|
||||
} else {
|
||||
*toffset = idx * sizeof(SColIdx); // the offset of SColIdx
|
||||
}
|
||||
} else {
|
||||
ASSERT(idx == (spd->colIdxInfo + idx)->boundIdx);
|
||||
schemaIdx = (spd->colIdxInfo + idx)->schemaColIdx;
|
||||
if (isDataRowT(memRowType)) {
|
||||
*toffset = (spd->cols + schemaIdx)->toffset;
|
||||
} else {
|
||||
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx);
|
||||
}
|
||||
}
|
||||
*colId = pSchema[schemaIdx].colId;
|
||||
}
|
||||
|
||||
/**
|
||||
* @brief Applicable to consume by multi-columns
|
||||
*
|
||||
* @param row
|
||||
* @param value
|
||||
* @param isCopyVarData In some scenario, the varVal is copied to row directly before calling tdAppend***ColVal()
|
||||
* @param colId
|
||||
* @param colType
|
||||
* @param idx index in SSchema
|
||||
* @param pBuilder
|
||||
* @param spd
|
||||
* @return FORCE_INLINE
|
||||
*/
|
||||
static FORCE_INLINE void tscAppendMemRowColVal(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||
int8_t colType, int32_t toffset, SMemRowBuilder *pBuilder,
|
||||
int32_t rowNum) {
|
||||
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
|
||||
SMemRowInfo *pRowInfo = pBuilder->rowInfo + rowNum;
|
||||
tdGetColAppendDeltaLen(value, colType, &pRowInfo->dataLen, &pRowInfo->kvLen);
|
||||
}
|
||||
}
|
||||
|
||||
// Applicable to consume by one row
|
||||
static FORCE_INLINE void tscAppendMemRowColValEx(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||
int8_t colType, int32_t toffset, int32_t *dataLen, int32_t *kvLen,
|
||||
uint8_t compareStat) {
|
||||
tdAppendMemRowColVal(row, value, isCopyVarData, colId, colType, toffset);
|
||||
if (compareStat == ROW_COMPARE_NEED) {
|
||||
tdGetColAppendDeltaLen(value, colType, dataLen, kvLen);
|
||||
}
|
||||
}
|
||||
typedef struct STableDataBlocks {
|
||||
SName tableName;
|
||||
int8_t tsSource; // where does the UNIX timestamp come from, server or client
|
||||
|
@ -146,7 +225,7 @@ typedef struct STableDataBlocks {
|
|||
uint32_t numOfAllocedParams;
|
||||
uint32_t numOfParams;
|
||||
SParamInfo * params;
|
||||
SMemRowHelper rowHelper;
|
||||
SMemRowBuilder rowBuilder;
|
||||
} STableDataBlocks;
|
||||
|
||||
typedef struct {
|
||||
|
@ -435,8 +514,398 @@ int16_t getNewResColId(SSqlCmd* pCmd);
|
|||
|
||||
int32_t schemaIdxCompar(const void *lhs, const void *rhs);
|
||||
int32_t boundIdxCompar(const void *lhs, const void *rhs);
|
||||
int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen);
|
||||
int32_t getExtendedRowSize(STableComInfo *tinfo);
|
||||
static FORCE_INLINE int32_t getExtendedRowSize(STableDataBlocks *pBlock) {
|
||||
ASSERT(pBlock->rowSize == pBlock->pTableMeta->tableInfo.rowSize);
|
||||
return pBlock->rowSize + TD_MEM_ROW_DATA_HEAD_SIZE + pBlock->boundColumnInfo.extendedVarLen;
|
||||
}
|
||||
|
||||
static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
|
||||
if (isDataRow(row)) {
|
||||
if (kvLen < (dataLen * KVRatioConvert)) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
} else if (kvLen > dataLen) {
|
||||
memRowSetConvert(row);
|
||||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE void initSMemRow(SMemRow row, uint8_t memRowType, STableDataBlocks *pBlock, int16_t nBoundCols) {
|
||||
memRowSetType(row, memRowType);
|
||||
if (isDataRowT(memRowType)) {
|
||||
dataRowSetVersion(memRowDataBody(row), pBlock->pTableMeta->sversion);
|
||||
dataRowSetLen(memRowDataBody(row), (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBlock->boundColumnInfo.flen));
|
||||
} else {
|
||||
ASSERT(nBoundCols > 0);
|
||||
memRowSetKvVersion(row, pBlock->pTableMeta->sversion);
|
||||
kvRowSetNCols(memRowKvBody(row), nBoundCols);
|
||||
kvRowSetLen(memRowKvBody(row), (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
|
||||
}
|
||||
}
|
||||
/**
|
||||
* TODO: Move to tdataformat.h and refactor when STSchema available.
|
||||
* - fetch flen and toffset from STSChema and remove param spd
|
||||
*/
|
||||
static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols,
|
||||
SParsedDataColInfo *spd) {
|
||||
ASSERT(isKvRow(src));
|
||||
SKVRow kvRow = memRowKvBody(src);
|
||||
SDataRow dataRow = memRowDataBody(dest);
|
||||
|
||||
memRowSetType(dest, SMEM_ROW_DATA);
|
||||
dataRowSetVersion(dataRow, memRowKvVersion(src));
|
||||
dataRowSetLen(dataRow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + spd->flen));
|
||||
|
||||
int32_t kvIdx = 0;
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
SSchema *schema = pSchema + i;
|
||||
void * val = tdGetKVRowValOfColEx(kvRow, schema->colId, &kvIdx);
|
||||
tdAppendDataColVal(dataRow, val != NULL ? val : getNullValue(schema->type), true, schema->type,
|
||||
(spd->cols + i)->toffset);
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move to tdataformat.h and refactor when STSchema available.
|
||||
static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols,
|
||||
SParsedDataColInfo *spd) {
|
||||
ASSERT(isDataRow(src));
|
||||
|
||||
SDataRow dataRow = memRowDataBody(src);
|
||||
SKVRow kvRow = memRowKvBody(dest);
|
||||
|
||||
memRowSetType(dest, SMEM_ROW_KV);
|
||||
memRowSetKvVersion(kvRow, dataRowVersion(dataRow));
|
||||
kvRowSetNCols(kvRow, nBoundCols);
|
||||
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nBoundCols));
|
||||
|
||||
int32_t toffset = 0, kvOffset = 0;
|
||||
for (int i = 0; i < nCols; ++i) {
|
||||
if ((spd->cols + i)->valStat == VAL_STAT_HAS) {
|
||||
SSchema *schema = pSchema + i;
|
||||
toffset = (spd->cols + i)->toffset;
|
||||
void *val = tdGetRowDataOfCol(dataRow, schema->type, toffset + TD_DATA_ROW_HEAD_SIZE);
|
||||
tdAppendKvColVal(kvRow, val, true, schema->colId, schema->type, kvOffset);
|
||||
kvOffset += sizeof(SColIdx);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: Move to tdataformat.h and refactor when STSchema available.
|
||||
static FORCE_INLINE void convertSMemRow(SMemRow dest, SMemRow src, STableDataBlocks *pBlock) {
|
||||
STableMeta * pTableMeta = pBlock->pTableMeta;
|
||||
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
|
||||
SSchema * pSchema = tscGetTableSchema(pTableMeta);
|
||||
SParsedDataColInfo *spd = &pBlock->boundColumnInfo;
|
||||
|
||||
ASSERT(dest != src);
|
||||
|
||||
if (isDataRow(src)) {
|
||||
// TODO: Can we use pBlock -> numOfParam directly?
|
||||
ASSERT(spd->numOfBound > 0);
|
||||
convertToSKVRow(dest, src, pSchema, tinfo.numOfColumns, spd->numOfBound, spd);
|
||||
} else {
|
||||
convertToSDataRow(dest, src, pSchema, tinfo.numOfColumns, spd);
|
||||
}
|
||||
}
|
||||
|
||||
static bool isNullStr(SStrToken *pToken) {
|
||||
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
|
||||
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
|
||||
}
|
||||
|
||||
static FORCE_INLINE int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
|
||||
errno = 0;
|
||||
*value = strtold(pToken->z, endPtr);
|
||||
|
||||
// not a valid integer number, return error
|
||||
if ((*endPtr - pToken->z) != pToken->n) {
|
||||
return TK_ILLEGAL;
|
||||
}
|
||||
|
||||
return pToken->type;
|
||||
}
|
||||
|
||||
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
|
||||
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
|
||||
|
||||
static FORCE_INLINE int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, SMemRow row, char *msg, char **str,
|
||||
bool primaryKey, int16_t timePrec, int32_t toffset, int16_t colId,
|
||||
int32_t *dataLen, int32_t *kvLen, uint8_t compareStat) {
|
||||
int64_t iv;
|
||||
int32_t ret;
|
||||
char * endptr = NULL;
|
||||
|
||||
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
|
||||
return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z);
|
||||
}
|
||||
|
||||
switch (pSchema->type) {
|
||||
case TSDB_DATA_TYPE_BOOL: { // bool
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
|
||||
if (strncmp(pToken->z, "true", pToken->n) == 0) {
|
||||
tscAppendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
|
||||
tscAppendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
} else if (pToken->type == TK_INTEGER) {
|
||||
iv = strtoll(pToken->z, NULL, 10);
|
||||
tscAppendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
|
||||
dataLen, kvLen, compareStat);
|
||||
} else if (pToken->type == TK_FLOAT) {
|
||||
double dv = strtod(pToken->z, NULL);
|
||||
tscAppendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
|
||||
dataLen, kvLen, compareStat);
|
||||
} else {
|
||||
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z);
|
||||
} else if (!IS_VALID_TINYINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z);
|
||||
} else if (!IS_VALID_UTINYINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z);
|
||||
} else if (!IS_VALID_SMALLINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
int16_t tmpVal = (int16_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z);
|
||||
} else if (!IS_VALID_USMALLINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint16_t tmpVal = (uint16_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid int data", pToken->z);
|
||||
} else if (!IS_VALID_INT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "int data overflow", pToken->z);
|
||||
}
|
||||
|
||||
int32_t tmpVal = (int32_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z);
|
||||
} else if (!IS_VALID_UINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint32_t tmpVal = (uint32_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z);
|
||||
} else if (!IS_VALID_BIGINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
tscAppendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z);
|
||||
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint64_t tmpVal = (uint64_t)iv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
|
||||
}
|
||||
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
|
||||
isnan(dv)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
|
||||
}
|
||||
|
||||
float tmpVal = (float)dv;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
if (isNullStr(pToken)) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
tscAppendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
|
||||
if (pToken->type == TK_NULL) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else { // too long values will return invalid sql, not be truncated automatically
|
||||
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
|
||||
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
|
||||
}
|
||||
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
|
||||
char *rowEnd = memRowEnd(row);
|
||||
STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n);
|
||||
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (pToken->type == TK_NULL) {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
} else {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
int32_t output = 0;
|
||||
char * rowEnd = memRowEnd(row);
|
||||
if (!taosMbsToUcs4(pToken->z, pToken->n, (char *)varDataVal(rowEnd), pSchema->bytes - VARSTR_HEADER_SIZE,
|
||||
&output)) {
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
||||
return tscInvalidOperationMsg(msg, buf, pToken->z);
|
||||
}
|
||||
varDataSetLen(rowEnd, output);
|
||||
tscAppendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||
if (pToken->type == TK_NULL) {
|
||||
if (primaryKey) {
|
||||
// When building SKVRow primaryKey, we should not skip even with NULL value.
|
||||
int64_t tmpVal = 0;
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
} else {
|
||||
tscAppendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
|
||||
compareStat);
|
||||
}
|
||||
} else {
|
||||
int64_t tmpVal;
|
||||
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
|
||||
}
|
||||
tscAppendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -38,43 +38,60 @@ enum {
|
|||
TSDB_USE_CLI_TS = 1,
|
||||
};
|
||||
|
||||
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
|
||||
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
|
||||
|
||||
static int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t *numOfRows);
|
||||
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo *pColInfo, SSchema *pSchema,
|
||||
char *str, char **end);
|
||||
|
||||
int32_t getExtendedRowSize(STableComInfo *tinfo) {
|
||||
return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_COL_HEAD_LEN * tinfo->numOfColumns;
|
||||
}
|
||||
int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) {
|
||||
pHelper->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta
|
||||
if (pHelper->allNullLen == 0) {
|
||||
for (uint16_t i = 0; i < nCols; ++i) {
|
||||
uint8_t type = pSSchema[i].type;
|
||||
int32_t typeLen = TYPE_BYTES[type];
|
||||
pHelper->allNullLen += typeLen;
|
||||
if (TSDB_DATA_TYPE_BINARY == type) {
|
||||
pHelper->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
} else if (TSDB_DATA_TYPE_NCHAR == type) {
|
||||
int len = VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE;
|
||||
pHelper->allNullLen += len;
|
||||
}
|
||||
int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint32_t nCols, uint32_t nBoundCols,
|
||||
int32_t allNullLen) {
|
||||
ASSERT(nRows >= 0 && nCols > 0 && (nBoundCols <= nCols));
|
||||
if (nRows > 0) {
|
||||
// already init(bind multiple rows by single column)
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED && (pBuilder->rowInfo != NULL)) {
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
static int32_t tscToDouble(SStrToken *pToken, double *value, char **endPtr) {
|
||||
errno = 0;
|
||||
*value = strtold(pToken->z, endPtr);
|
||||
|
||||
// not a valid integer number, return error
|
||||
if ((*endPtr - pToken->z) != pToken->n) {
|
||||
return TK_ILLEGAL;
|
||||
|
||||
if (nBoundCols == 0) { // file input
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
pBuilder->compareStat = ROW_COMPARE_NO_NEED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else {
|
||||
float boundRatio = ((float)nBoundCols / (float)nCols);
|
||||
|
||||
if (boundRatio < KVRatioKV) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
pBuilder->compareStat = ROW_COMPARE_NO_NEED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
} else if (boundRatio > KVRatioData) {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
pBuilder->compareStat = ROW_COMPARE_NO_NEED;
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
pBuilder->compareStat = ROW_COMPARE_NEED;
|
||||
|
||||
if (boundRatio < KVRatioPredict) {
|
||||
pBuilder->memRowType = SMEM_ROW_KV;
|
||||
} else {
|
||||
pBuilder->memRowType = SMEM_ROW_DATA;
|
||||
}
|
||||
}
|
||||
|
||||
return pToken->type;
|
||||
pBuilder->dataRowInitLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen;
|
||||
pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx);
|
||||
|
||||
if (nRows > 0) {
|
||||
pBuilder->rowInfo = tcalloc(nRows, sizeof(SMemRowInfo));
|
||||
if (pBuilder->rowInfo == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
for (int i = 0; i < nRows; ++i) {
|
||||
(pBuilder->rowInfo + i)->dataLen = pBuilder->dataRowInitLen;
|
||||
(pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec) {
|
||||
|
@ -146,10 +163,6 @@ int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int1
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static bool isNullStr(SStrToken* pToken) {
|
||||
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
|
||||
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
|
||||
}
|
||||
int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, bool primaryKey,
|
||||
int16_t timePrec) {
|
||||
int64_t iv;
|
||||
|
@ -400,342 +413,6 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static FORCE_INLINE TDRowLenT tsSetPayloadColValue(char *payloadStart, char *payload, int16_t columnId,
|
||||
uint8_t columnType, const void *value, uint16_t valueLen, TDRowTLenT tOffset) {
|
||||
payloadColSetId(payload, columnId);
|
||||
payloadColSetType(payload, columnType);
|
||||
memcpy(POINTER_SHIFT(payloadStart,tOffset), value, valueLen);
|
||||
return valueLen;
|
||||
}
|
||||
|
||||
static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *payloadStart, char *primaryKeyStart,
|
||||
char *payload, char *msg, char **str, bool primaryKey, int16_t timePrec,
|
||||
TDRowTLenT tOffset, TDRowLenT *sizeAppend, TDRowLenT *dataRowColDeltaLen,
|
||||
TDRowLenT *kvRowColLen) {
|
||||
int64_t iv;
|
||||
int32_t ret;
|
||||
char * endptr = NULL;
|
||||
|
||||
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
|
||||
return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z);
|
||||
}
|
||||
|
||||
switch (pSchema->type) {
|
||||
case TSDB_DATA_TYPE_BOOL: { // bool
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_BOOL), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
|
||||
} else {
|
||||
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
|
||||
if (strncmp(pToken->z, "true", pToken->n) == 0) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &TRUE_VALUE,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &FALSE_VALUE,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
} else {
|
||||
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
} else if (pToken->type == TK_INTEGER) {
|
||||
iv = strtoll(pToken->z, NULL, 10);
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
} else if (pToken->type == TK_FLOAT) {
|
||||
double dv = strtod(pToken->z, NULL);
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
|
||||
} else {
|
||||
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
case TSDB_DATA_TYPE_TINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_TINYINT), TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z);
|
||||
} else if (!IS_VALID_TINYINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UTINYINT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_UTINYINT), TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z);
|
||||
} else if (!IS_VALID_UTINYINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint8_t tmpVal = (uint8_t)iv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_SMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_SMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z);
|
||||
} else if (!IS_VALID_SMALLINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
int16_t tmpVal = (int16_t)iv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_USMALLINT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend =
|
||||
tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_USMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z);
|
||||
} else if (!IS_VALID_USMALLINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint16_t tmpVal = (uint16_t)iv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_INT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_INT), TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid int data", pToken->z);
|
||||
} else if (!IS_VALID_INT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "int data overflow", pToken->z);
|
||||
}
|
||||
|
||||
int32_t tmpVal = (int32_t)iv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_INT]);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UINT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_UINT), TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z);
|
||||
} else if (!IS_VALID_UINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint32_t tmpVal = (uint32_t)iv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UINT]);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_BIGINT), TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z);
|
||||
} else if (!IS_VALID_BIGINT(iv)) {
|
||||
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &iv,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_UBIGINT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_UBIGINT), TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset);
|
||||
} else {
|
||||
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z);
|
||||
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
|
||||
return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z);
|
||||
}
|
||||
|
||||
uint64_t tmpVal = (uint64_t)iv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_FLOAT:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_FLOAT), TYPE_BYTES[TSDB_DATA_TYPE_FLOAT], tOffset);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
|
||||
}
|
||||
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
|
||||
isnan(dv)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal float data", pToken->z);
|
||||
}
|
||||
|
||||
float tmpVal = (float)dv;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_FLOAT], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_DOUBLE:
|
||||
if (isNullStr(pToken)) {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_DOUBLE), TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset);
|
||||
} else {
|
||||
double dv;
|
||||
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
|
||||
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
|
||||
}
|
||||
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &dv,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
|
||||
if (pToken->type == TK_NULL) {
|
||||
payloadColSetId(payload, pSchema->colId);
|
||||
payloadColSetType(payload, pSchema->type);
|
||||
memcpy(POINTER_SHIFT(payloadStart, tOffset), getNullValue(TSDB_DATA_TYPE_BINARY), VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
} else { // too long values will return invalid sql, not be truncated automatically
|
||||
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
|
||||
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
|
||||
}
|
||||
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
|
||||
|
||||
payloadColSetId(payload, pSchema->colId);
|
||||
payloadColSetType(payload, pSchema->type);
|
||||
varDataSetLen(POINTER_SHIFT(payloadStart,tOffset), pToken->n);
|
||||
memcpy(varDataVal(POINTER_SHIFT(payloadStart,tOffset)), pToken->z, pToken->n);
|
||||
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + pToken->n);
|
||||
*dataRowColDeltaLen += (TDRowLenT)(pToken->n - CHAR_BYTES);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + pToken->n);
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
if (pToken->type == TK_NULL) {
|
||||
payloadColSetId(payload, pSchema->colId);
|
||||
payloadColSetType(payload, pSchema->type);
|
||||
memcpy(POINTER_SHIFT(payloadStart,tOffset), getNullValue(TSDB_DATA_TYPE_NCHAR), VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
} else {
|
||||
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
|
||||
int32_t output = 0;
|
||||
payloadColSetId(payload, pSchema->colId);
|
||||
payloadColSetType(payload, pSchema->type);
|
||||
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(POINTER_SHIFT(payloadStart,tOffset)),
|
||||
pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
|
||||
char buf[512] = {0};
|
||||
snprintf(buf, tListLen(buf), "%s", strerror(errno));
|
||||
return tscInvalidOperationMsg(msg, buf, pToken->z);
|
||||
}
|
||||
|
||||
varDataSetLen(POINTER_SHIFT(payloadStart,tOffset), output);
|
||||
|
||||
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + output);
|
||||
*dataRowColDeltaLen += (TDRowLenT)(output - sizeof(uint32_t));
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + output);
|
||||
}
|
||||
break;
|
||||
|
||||
case TSDB_DATA_TYPE_TIMESTAMP: {
|
||||
if (pToken->type == TK_NULL) {
|
||||
if (primaryKey) {
|
||||
// When building SKVRow primaryKey, we should not skip even with NULL value.
|
||||
int64_t tmpVal = 0;
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, primaryKeyStart, pSchema->colId, pSchema->type, &tmpVal,
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
|
||||
} else {
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
|
||||
getNullValue(TSDB_DATA_TYPE_TIMESTAMP),
|
||||
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset);
|
||||
}
|
||||
} else {
|
||||
int64_t tmpVal;
|
||||
if (tsParseTime(pToken, &tmpVal, str, msg, timePrec) != TSDB_CODE_SUCCESS) {
|
||||
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
|
||||
}
|
||||
|
||||
*sizeAppend = tsSetPayloadColValue(payloadStart, primaryKey ? primaryKeyStart : payload, pSchema->colId,
|
||||
pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset);
|
||||
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
/*
|
||||
* The server time/client time should not be mixed up in one sql string
|
||||
* Do not employ sort operation is not involved if server time is used.
|
||||
|
@ -777,31 +454,24 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
int32_t index = 0;
|
||||
SStrToken sToken = {0};
|
||||
|
||||
SMemRowHelper *pHelper = &pDataBlocks->rowHelper;
|
||||
char * payload = pDataBlocks->pData + pDataBlocks->size;
|
||||
char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header
|
||||
|
||||
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
|
||||
SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta);
|
||||
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
|
||||
SSchema * schema = tscGetTableSchema(pTableMeta);
|
||||
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
|
||||
int32_t dataLen = pBuilder->dataRowInitLen;
|
||||
int32_t kvLen = pBuilder->kvRowInitLen;
|
||||
bool isParseBindParam = false;
|
||||
|
||||
TDRowTLenT dataRowLen = pHelper->allNullLen;
|
||||
TDRowTLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE;
|
||||
TDRowTLenT payloadValOffset = 0;
|
||||
TDRowLenT colValOffset = 0;
|
||||
ASSERT(dataRowLen > 0);
|
||||
|
||||
payloadSetNCols(payload, spd->numOfBound);
|
||||
payloadValOffset = payloadValuesOffset(payload); // rely on payloadNCols
|
||||
// payloadSetTLen(payload, payloadValOffset);
|
||||
|
||||
char *kvPrimaryKeyStart = payload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple
|
||||
char *kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey
|
||||
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
|
||||
|
||||
// 1. set the parsed value from sql string
|
||||
for (int i = 0; i < spd->numOfBound; ++i) {
|
||||
// the start position in data block buffer of current value in sql
|
||||
int32_t colIndex = spd->boundedColumns[i];
|
||||
|
||||
char *start = payload + spd->cols[colIndex].offset;
|
||||
char *start = row + spd->cols[colIndex].offset;
|
||||
|
||||
SSchema *pSchema = &schema[colIndex]; // get colId here
|
||||
|
||||
|
@ -810,6 +480,9 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
*str += index;
|
||||
|
||||
if (sToken.type == TK_QUESTION) {
|
||||
if (!isParseBindParam) {
|
||||
isParseBindParam = true;
|
||||
}
|
||||
if (pInsertParam->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
|
||||
return tscSQLSyntaxErrMsg(pInsertParam->msg, "? only allowed in binding insertion", *str);
|
||||
}
|
||||
|
@ -860,54 +533,45 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
|
|||
sToken.n -= 2 + cnt;
|
||||
}
|
||||
|
||||
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
|
||||
TDRowLenT dataRowDeltaColLen = 0; // When combine the data as SDataRow, the delta len between all NULL columns.
|
||||
TDRowLenT kvRowColLen = 0;
|
||||
TDRowLenT colValAppended = 0;
|
||||
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
|
||||
int32_t toffset = -1;
|
||||
int16_t colId = -1;
|
||||
tscGetMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
|
||||
|
||||
if (!IS_DATA_COL_ORDERED(spd->orderStatus)) {
|
||||
ASSERT(spd->colIdxInfo != NULL);
|
||||
if(!isPrimaryKey) {
|
||||
kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN);
|
||||
} else {
|
||||
ASSERT(spd->colIdxInfo[i].finalIdx == 0);
|
||||
}
|
||||
}
|
||||
// the primary key locates in 1st column
|
||||
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, payload, kvPrimaryKeyStart, kvStart, pInsertParam->msg, str,
|
||||
isPrimaryKey, timePrec, payloadValOffset + colValOffset, &colValAppended,
|
||||
&dataRowDeltaColLen, &kvRowColLen);
|
||||
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, row, pInsertParam->msg, str, isPrimaryKey, timePrec, toffset,
|
||||
colId, &dataLen, &kvLen, pBuilder->compareStat);
|
||||
if (ret != TSDB_CODE_SUCCESS) {
|
||||
return ret;
|
||||
}
|
||||
|
||||
if (isPrimaryKey) {
|
||||
if (tsCheckTimestamp(pDataBlocks, payloadValues(payload)) != TSDB_CODE_SUCCESS) {
|
||||
TSKEY tsKey = memRowKey(row);
|
||||
if (tsCheckTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) {
|
||||
tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z);
|
||||
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
|
||||
}
|
||||
payloadColSetOffset(kvPrimaryKeyStart, colValOffset);
|
||||
} else {
|
||||
payloadColSetOffset(kvStart, colValOffset);
|
||||
if (IS_DATA_COL_ORDERED(spd->orderStatus)) {
|
||||
kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column
|
||||
}
|
||||
}
|
||||
|
||||
if (!isParseBindParam) {
|
||||
// 2. check and set convert flag
|
||||
if (pBuilder->compareStat == ROW_COMPARE_NEED) {
|
||||
checkAndConvertMemRow(row, dataLen, kvLen);
|
||||
}
|
||||
|
||||
// 3. set the null value for the columns that do not assign values
|
||||
if ((spd->numOfBound < spd->numOfCols) && isDataRow(row) && !isNeedConvertRow(row)) {
|
||||
SDataRow dataRow = memRowDataBody(row);
|
||||
for (int32_t i = 0; i < spd->numOfCols; ++i) {
|
||||
if (spd->cols[i].valStat == VAL_STAT_NONE) {
|
||||
tdAppendDataColVal(dataRow, getNullValue(schema[i].type), true, schema[i].type, spd->cols[i].toffset);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
colValOffset += colValAppended;
|
||||
kvRowLen += kvRowColLen;
|
||||
dataRowLen += dataRowDeltaColLen;
|
||||
}
|
||||
|
||||
if (kvRowLen < dataRowLen) {
|
||||
payloadSetType(payload, SMEM_ROW_KV);
|
||||
} else {
|
||||
payloadSetType(payload, SMEM_ROW_DATA);
|
||||
}
|
||||
*len = getExtendedRowSize(pDataBlocks);
|
||||
|
||||
*len = (int32_t)(payloadValOffset + colValOffset);
|
||||
payloadSetTLen(payload, *len);
|
||||
|
||||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
|
@ -957,11 +621,13 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
|||
|
||||
int32_t precision = tinfo.precision;
|
||||
|
||||
int32_t extendedRowSize = getExtendedRowSize(&tinfo);
|
||||
|
||||
initSMemRowHelper(&pDataBlock->rowHelper, tscGetTableSchema(pDataBlock->pTableMeta),
|
||||
tscGetNumOfColumns(pDataBlock->pTableMeta), 0);
|
||||
int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
|
||||
|
||||
if (TSDB_CODE_SUCCESS !=
|
||||
(code = initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound,
|
||||
pDataBlock->boundColumnInfo.allNullLen))) {
|
||||
return code;
|
||||
}
|
||||
while (1) {
|
||||
index = 0;
|
||||
sToken = tStrGetToken(*str, &index, false);
|
||||
|
@ -1012,19 +678,37 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
|
|||
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
|
||||
pColInfo->numOfCols = numOfCols;
|
||||
pColInfo->numOfBound = numOfCols;
|
||||
pColInfo->orderStatus = ORDER_STATUS_ORDERED;
|
||||
pColInfo->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode
|
||||
pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
|
||||
pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
|
||||
pColInfo->colIdxInfo = NULL;
|
||||
pColInfo->flen = 0;
|
||||
pColInfo->allNullLen = 0;
|
||||
|
||||
int32_t nVar = 0;
|
||||
for (int32_t i = 0; i < pColInfo->numOfCols; ++i) {
|
||||
uint8_t type = pSchema[i].type;
|
||||
if (i > 0) {
|
||||
pColInfo->cols[i].offset = pSchema[i - 1].bytes + pColInfo->cols[i - 1].offset;
|
||||
pColInfo->cols[i].toffset = pColInfo->flen;
|
||||
}
|
||||
pColInfo->flen += TYPE_BYTES[type];
|
||||
switch (type) {
|
||||
case TSDB_DATA_TYPE_BINARY:
|
||||
pColInfo->allNullLen += (VARSTR_HEADER_SIZE + CHAR_BYTES);
|
||||
++nVar;
|
||||
break;
|
||||
case TSDB_DATA_TYPE_NCHAR:
|
||||
pColInfo->allNullLen += (VARSTR_HEADER_SIZE + TSDB_NCHAR_SIZE);
|
||||
++nVar;
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
|
||||
pColInfo->cols[i].hasVal = true;
|
||||
pColInfo->boundedColumns[i] = i;
|
||||
}
|
||||
pColInfo->allNullLen += pColInfo->flen;
|
||||
pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT));
|
||||
}
|
||||
|
||||
int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) {
|
||||
|
@ -1124,35 +808,29 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
|
|||
if (dataBuf->tsSource == TSDB_USE_SERVER_TS) {
|
||||
assert(dataBuf->ordered);
|
||||
}
|
||||
// allocate memory
|
||||
// allocate memory
|
||||
size_t nAlloc = nRows * sizeof(SBlockKeyTuple);
|
||||
if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) {
|
||||
size_t nRealAlloc = nAlloc + 10 * sizeof(SBlockKeyTuple);
|
||||
char * tmp = trealloc(pBlkKeyInfo->pKeyTuple, nRealAlloc);
|
||||
if (tmp == NULL) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp;
|
||||
pBlkKeyInfo->maxBytesAlloc = (int32_t)nRealAlloc;
|
||||
}
|
||||
memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc);
|
||||
|
||||
int32_t extendedRowSize = getExtendedRowSize(dataBuf);
|
||||
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
|
||||
char * pBlockData = pBlocks->data;
|
||||
TDRowTLenT totolPayloadTLen = 0;
|
||||
TDRowTLenT payloadTLen = 0;
|
||||
int n = 0;
|
||||
while (n < nRows) {
|
||||
pBlkKeyTuple->skey = payloadTSKey(pBlockData);
|
||||
pBlkKeyTuple->skey = memRowKey(pBlockData);
|
||||
pBlkKeyTuple->payloadAddr = pBlockData;
|
||||
payloadTLen = payloadTLen(pBlockData);
|
||||
#if 0
|
||||
ASSERT(payloadNCols(pBlockData) <= 4096);
|
||||
ASSERT(payloadTLen(pBlockData) < 65536);
|
||||
#endif
|
||||
totolPayloadTLen += payloadTLen;
|
||||
|
||||
// next loop
|
||||
pBlockData += payloadTLen;
|
||||
pBlockData += extendedRowSize;
|
||||
++pBlkKeyTuple;
|
||||
++n;
|
||||
}
|
||||
|
@ -1169,7 +847,6 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
|
|||
TSKEY tj = (pBlkKeyTuple + j)->skey;
|
||||
|
||||
if (ti == tj) {
|
||||
totolPayloadTLen -= payloadTLen(pBlkKeyTuple + j);
|
||||
++j;
|
||||
continue;
|
||||
}
|
||||
|
@ -1185,17 +862,15 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
|
|||
pBlocks->numOfRows = i + 1;
|
||||
}
|
||||
|
||||
dataBuf->size = sizeof(SSubmitBlk) + totolPayloadTLen;
|
||||
dataBuf->size = sizeof(SSubmitBlk) + pBlocks->numOfRows * extendedRowSize;
|
||||
dataBuf->prevTS = INT64_MIN;
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
|
||||
STableComInfo tinfo = tscGetTableInfo(dataBuf->pTableMeta);
|
||||
|
||||
static int32_t doParseInsertStatement(SInsertStatementParam *pInsertParam, char **str, STableDataBlocks* dataBuf, int32_t *totalNum) {
|
||||
int32_t maxNumOfRows;
|
||||
int32_t code = tscAllocateMemIfNeed(dataBuf, getExtendedRowSize(&tinfo), &maxNumOfRows);
|
||||
int32_t code = tscAllocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows);
|
||||
if (TSDB_CODE_SUCCESS != code) {
|
||||
return TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -1533,7 +1208,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
pColInfo->numOfBound = 0;
|
||||
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * nCols);
|
||||
for (int32_t i = 0; i < nCols; ++i) {
|
||||
pColInfo->cols[i].hasVal = false;
|
||||
pColInfo->cols[i].valStat = VAL_STAT_NONE;
|
||||
}
|
||||
|
||||
int32_t code = TSDB_CODE_SUCCESS;
|
||||
|
@ -1572,12 +1247,12 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
int32_t nScanned = 0, t = lastColIdx + 1;
|
||||
while (t < nCols) {
|
||||
if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
|
||||
if (pColInfo->cols[t].hasVal == true) {
|
||||
if (pColInfo->cols[t].valStat == VAL_STAT_HAS) {
|
||||
code = tscInvalidOperationMsg(pInsertParam->msg, "duplicated column name", sToken.z);
|
||||
goto _clean;
|
||||
}
|
||||
|
||||
pColInfo->cols[t].hasVal = true;
|
||||
pColInfo->cols[t].valStat = VAL_STAT_HAS;
|
||||
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
|
||||
++pColInfo->numOfBound;
|
||||
findColumnIndex = true;
|
||||
|
@ -1595,12 +1270,12 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
|
|||
int32_t nRemain = nCols - nScanned;
|
||||
while (t < nRemain) {
|
||||
if (strncmp(sToken.z, pSchema[t].name, sToken.n) == 0 && strlen(pSchema[t].name) == sToken.n) {
|
||||
if (pColInfo->cols[t].hasVal == true) {
|
||||
if (pColInfo->cols[t].valStat == VAL_STAT_HAS) {
|
||||
code = tscInvalidOperationMsg(pInsertParam->msg, "duplicated column name", sToken.z);
|
||||
goto _clean;
|
||||
}
|
||||
|
||||
pColInfo->cols[t].hasVal = true;
|
||||
pColInfo->cols[t].valStat = VAL_STAT_HAS;
|
||||
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
|
||||
++pColInfo->numOfBound;
|
||||
findColumnIndex = true;
|
||||
|
@ -1835,7 +1510,7 @@ int tsParseInsertSql(SSqlObj *pSql) {
|
|||
goto _clean;
|
||||
}
|
||||
|
||||
if (dataBuf->boundColumnInfo.cols[0].hasVal == false) {
|
||||
if (dataBuf->boundColumnInfo.cols[0].valStat == VAL_STAT_NONE) {
|
||||
code = tscInvalidOperationMsg(pInsertParam->msg, "primary timestamp column can not be null", NULL);
|
||||
goto _clean;
|
||||
}
|
||||
|
@ -2046,15 +1721,18 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
|
|||
goto _error;
|
||||
}
|
||||
|
||||
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(&tinfo), &maxRows);
|
||||
tscAllocateMemIfNeed(pTableDataBlock, getExtendedRowSize(pTableDataBlock), &maxRows);
|
||||
tokenBuf = calloc(1, TSDB_MAX_BYTES_PER_ROW);
|
||||
if (tokenBuf == NULL) {
|
||||
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
|
||||
goto _error;
|
||||
}
|
||||
|
||||
initSMemRowHelper(&pTableDataBlock->rowHelper, tscGetTableSchema(pTableDataBlock->pTableMeta),
|
||||
tscGetNumOfColumns(pTableDataBlock->pTableMeta), 0);
|
||||
if (TSDB_CODE_SUCCESS !=
|
||||
(ret = initMemRowBuilder(&pTableDataBlock->rowBuilder, 0, tinfo.numOfColumns, pTableDataBlock->numOfParams,
|
||||
pTableDataBlock->boundColumnInfo.allNullLen))) {
|
||||
goto _error;
|
||||
}
|
||||
|
||||
while ((readLen = tgetline(&line, &n, fp)) != -1) {
|
||||
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {
|
||||
|
|
|
@ -299,7 +299,7 @@ static int fillColumnsNull(STableDataBlocks* pBlock, int32_t rowNum) {
|
|||
SSchema *schema = (SSchema*)pBlock->pTableMeta->schema;
|
||||
|
||||
for (int32_t i = 0; i < spd->numOfCols; ++i) {
|
||||
if (!spd->cols[i].hasVal) { // current column do not have any value to insert, set it to null
|
||||
if (spd->cols[i].valStat == VAL_STAT_NONE) { // current column do not have any value to insert, set it to null
|
||||
for (int32_t n = 0; n < rowNum; ++n) {
|
||||
char *ptr = pBlock->pData + sizeof(SSubmitBlk) + pBlock->rowSize * n + offset;
|
||||
|
||||
|
|
|
@ -1808,101 +1808,6 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
|
|||
return TSDB_CODE_SUCCESS;
|
||||
}
|
||||
|
||||
static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
|
||||
SSchema* pSchema = pBuilder->pSchema;
|
||||
char* p = (char*)pBuilder->buf;
|
||||
int toffset = 0;
|
||||
uint16_t nCols = pBuilder->nCols;
|
||||
|
||||
uint8_t memRowType = payloadType(p);
|
||||
uint16_t nColsBound = payloadNCols(p);
|
||||
if (pBuilder->nCols <= 0 || nColsBound <= 0) {
|
||||
return NULL;
|
||||
}
|
||||
char* pVals = POINTER_SHIFT(p, payloadValuesOffset(p));
|
||||
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
|
||||
memRowSetType(memRow, memRowType);
|
||||
|
||||
// ----------------- Raw payload structure for row:
|
||||
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
|
||||
* | |<----------------- flen ------------->|<--- value part --->|
|
||||
* |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... |
|
||||
* +-----------+----------+----------+--------------------------------------|--------------------|
|
||||
* | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... |
|
||||
* +-----------+----------+----------+--------------------------------------+--------------------|
|
||||
* 1. offset in column data tuple starts from the value part in case of uint16_t overflow.
|
||||
* 2. dataTLen: total length including the header and body.
|
||||
*/
|
||||
|
||||
if (memRowType == SMEM_ROW_DATA) {
|
||||
SDataRow trow = (SDataRow)memRowDataBody(memRow);
|
||||
dataRowSetLen(trow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
|
||||
dataRowSetVersion(trow, pBuilder->sversion);
|
||||
|
||||
p = (char*)payloadBody(pBuilder->buf);
|
||||
uint16_t i = 0, j = 0;
|
||||
while (j < nCols) {
|
||||
if (i >= nColsBound) {
|
||||
break;
|
||||
}
|
||||
int16_t colId = payloadColId(p);
|
||||
if (colId == pSchema[j].colId) {
|
||||
// ASSERT(payloadColType(p) == pSchema[j].type);
|
||||
tdAppendColVal(trow, POINTER_SHIFT(pVals, payloadColOffset(p)), pSchema[j].type, toffset);
|
||||
toffset += TYPE_BYTES[pSchema[j].type];
|
||||
p = payloadNextCol(p);
|
||||
++i;
|
||||
++j;
|
||||
} else if (colId < pSchema[j].colId) {
|
||||
p = payloadNextCol(p);
|
||||
++i;
|
||||
} else {
|
||||
tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
|
||||
toffset += TYPE_BYTES[pSchema[j].type];
|
||||
++j;
|
||||
}
|
||||
}
|
||||
|
||||
while (j < nCols) {
|
||||
tdAppendColVal(trow, getNullValue(pSchema[j].type), pSchema[j].type, toffset);
|
||||
toffset += TYPE_BYTES[pSchema[j].type];
|
||||
++j;
|
||||
}
|
||||
|
||||
#if 0 // no need anymore
|
||||
while (i < nColsBound) {
|
||||
p = payloadNextCol(p);
|
||||
++i;
|
||||
}
|
||||
#endif
|
||||
|
||||
} else if (memRowType == SMEM_ROW_KV) {
|
||||
SKVRow kvRow = (SKVRow)memRowKvBody(memRow);
|
||||
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsBound));
|
||||
kvRowSetNCols(kvRow, nColsBound);
|
||||
memRowSetKvVersion(memRow, pBuilder->sversion);
|
||||
|
||||
p = (char*)payloadBody(pBuilder->buf);
|
||||
int i = 0;
|
||||
while (i < nColsBound) {
|
||||
int16_t colId = payloadColId(p);
|
||||
uint8_t colType = payloadColType(p);
|
||||
tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, &toffset);
|
||||
//toffset += sizeof(SColIdx);
|
||||
p = payloadNextCol(p);
|
||||
++i;
|
||||
}
|
||||
|
||||
} else {
|
||||
ASSERT(0);
|
||||
}
|
||||
int32_t rowTLen = memRowTLen(memRow);
|
||||
pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + rowTLen; // next row
|
||||
pBuilder->pSubmitBlk->dataLen += rowTLen;
|
||||
|
||||
return memRow;
|
||||
}
|
||||
|
||||
// Erase the empty space reserved for binary data
|
||||
static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SInsertStatementParam* insertParam,
|
||||
SBlockKeyTuple* blkKeyTuple) {
|
||||
|
@ -1934,10 +1839,11 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
|
|||
int32_t schemaSize = sizeof(STColumn) * numOfCols;
|
||||
pBlock->schemaLen = schemaSize;
|
||||
} else {
|
||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||
flen += TYPE_BYTES[pSchema[j].type];
|
||||
if (IS_RAW_PAYLOAD(insertParam->payloadType)) {
|
||||
for (int32_t j = 0; j < tinfo.numOfColumns; ++j) {
|
||||
flen += TYPE_BYTES[pSchema[j].type];
|
||||
}
|
||||
}
|
||||
|
||||
pBlock->schemaLen = 0;
|
||||
}
|
||||
|
||||
|
@ -1964,18 +1870,19 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
|
|||
pBlock->dataLen += memRowTLen(memRow);
|
||||
}
|
||||
} else {
|
||||
SMemRowBuilder rowBuilder;
|
||||
rowBuilder.pSchema = pSchema;
|
||||
rowBuilder.sversion = pTableMeta->sversion;
|
||||
rowBuilder.flen = flen;
|
||||
rowBuilder.nCols = tinfo.numOfColumns;
|
||||
rowBuilder.pDataBlock = pDataBlock;
|
||||
rowBuilder.pSubmitBlk = pBlock;
|
||||
rowBuilder.buf = p;
|
||||
|
||||
for (int32_t i = 0; i < numOfRows; ++i) {
|
||||
rowBuilder.buf = (blkKeyTuple + i)->payloadAddr;
|
||||
tdGenMemRowFromBuilder(&rowBuilder);
|
||||
char* payload = (blkKeyTuple + i)->payloadAddr;
|
||||
if (isNeedConvertRow(payload)) {
|
||||
convertSMemRow(pDataBlock, payload, pTableDataBlock);
|
||||
TDRowTLenT rowTLen = memRowTLen(pDataBlock);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
} else {
|
||||
TDRowTLenT rowTLen = memRowTLen(payload);
|
||||
memcpy(pDataBlock, payload, rowTLen);
|
||||
pDataBlock = POINTER_SHIFT(pDataBlock, rowTLen);
|
||||
pBlock->dataLen += rowTLen;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1988,9 +1895,9 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SI
|
|||
|
||||
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
|
||||
int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE;
|
||||
int32_t columns = tscGetNumOfColumns(pTableMeta);
|
||||
int32_t columns = tscGetNumOfColumns(pTableMeta);
|
||||
SSchema* pSchema = tscGetTableSchema(pTableMeta);
|
||||
for(int32_t i = 0; i < columns; i++) {
|
||||
for (int32_t i = 0; i < columns; i++) {
|
||||
if (IS_VAR_DATA_TYPE((pSchema + i)->type)) {
|
||||
result += TYPE_BYTES[TSDB_DATA_TYPE_BINARY];
|
||||
}
|
||||
|
@ -2036,7 +1943,7 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
SSubmitBlk* pBlocks = (SSubmitBlk*) pOneTableBlock->pData;
|
||||
if (pBlocks->numOfRows > 0) {
|
||||
// the maximum expanded size in byte when a row-wise data is converted to SDataRow format
|
||||
int32_t expandSize = getRowExpandSize(pOneTableBlock->pTableMeta);
|
||||
int32_t expandSize = isRawPayload ? getRowExpandSize(pOneTableBlock->pTableMeta) : 0;
|
||||
STableDataBlocks* dataBuf = NULL;
|
||||
|
||||
int32_t ret = tscGetDataBlockFromList(pVnodeDataBlockHashList, pOneTableBlock->vgId, TSDB_PAYLOAD_SIZE,
|
||||
|
@ -2049,7 +1956,8 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
return ret;
|
||||
}
|
||||
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
int64_t destSize = dataBuf->size + pOneTableBlock->size + pBlocks->numOfRows * expandSize +
|
||||
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
||||
if (dataBuf->nAllocSize < destSize) {
|
||||
dataBuf->nAllocSize = (uint32_t)(destSize * 1.5);
|
||||
|
@ -2093,7 +2001,9 @@ int32_t tscMergeTableDataBlocks(SInsertStatementParam *pInsertParam, bool freeBl
|
|||
pBlocks->numOfRows, pBlocks->sversion, blkKeyInfo.pKeyTuple->skey, pLastKeyTuple->skey);
|
||||
}
|
||||
|
||||
int32_t len = pBlocks->numOfRows * (pOneTableBlock->rowSize + expandSize) + sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
int32_t len = pBlocks->numOfRows *
|
||||
(isRawPayload ? (pOneTableBlock->rowSize + expandSize) : getExtendedRowSize(pOneTableBlock)) +
|
||||
sizeof(STColumn) * tscGetNumOfColumns(pOneTableBlock->pTableMeta);
|
||||
|
||||
pBlocks->tid = htonl(pBlocks->tid);
|
||||
pBlocks->uid = htobe64(pBlocks->uid);
|
||||
|
|
|
@ -186,6 +186,7 @@ typedef void *SDataRow;
|
|||
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
|
||||
|
||||
#define dataRowLen(r) (*(TDRowLenT *)(r)) // 0~65535
|
||||
#define dataRowEnd(r) POINTER_SHIFT(r, dataRowLen(r))
|
||||
#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
|
||||
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
|
||||
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
|
||||
|
@ -201,14 +202,18 @@ void tdFreeDataRow(SDataRow row);
|
|||
void tdInitDataRow(SDataRow row, STSchema *pSchema);
|
||||
SDataRow tdDataRowDup(SDataRow row);
|
||||
|
||||
|
||||
// offset here not include dataRow header length
|
||||
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
|
||||
static FORCE_INLINE int tdAppendDataColVal(SDataRow row, const void *value, bool isCopyVarData, int8_t type,
|
||||
int32_t offset) {
|
||||
ASSERT(value != NULL);
|
||||
int32_t toffset = offset + TD_DATA_ROW_HEAD_SIZE;
|
||||
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
*(VarDataOffsetT *)POINTER_SHIFT(row, toffset) = dataRowLen(row);
|
||||
memcpy(POINTER_SHIFT(row, dataRowLen(row)), value, varDataTLen(value));
|
||||
if (isCopyVarData) {
|
||||
memcpy(POINTER_SHIFT(row, dataRowLen(row)), value, varDataTLen(value));
|
||||
}
|
||||
dataRowLen(row) += varDataTLen(value);
|
||||
} else {
|
||||
if (offset == 0) {
|
||||
|
@ -223,6 +228,12 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t
|
|||
return 0;
|
||||
}
|
||||
|
||||
|
||||
// offset here not include dataRow header length
|
||||
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
|
||||
return tdAppendDataColVal(row, value, true, type, offset);
|
||||
}
|
||||
|
||||
// NOTE: offset here including the header size
|
||||
static FORCE_INLINE void *tdGetRowDataOfCol(SDataRow row, int8_t type, int32_t offset) {
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
|
@ -472,9 +483,10 @@ static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
|
|||
}
|
||||
|
||||
// offset here not include kvRow header length
|
||||
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t colId, int8_t type, int32_t *offset) {
|
||||
static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, bool isCopyValData, int16_t colId, int8_t type,
|
||||
int32_t offset) {
|
||||
ASSERT(value != NULL);
|
||||
int32_t toffset = *offset + TD_KV_ROW_HEAD_SIZE;
|
||||
int32_t toffset = offset + TD_KV_ROW_HEAD_SIZE;
|
||||
SColIdx *pColIdx = (SColIdx *)POINTER_SHIFT(row, toffset);
|
||||
char * ptr = (char *)POINTER_SHIFT(row, kvRowLen(row));
|
||||
|
||||
|
@ -482,10 +494,12 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
|
|||
pColIdx->offset = kvRowLen(row); // offset of pColIdx including the TD_KV_ROW_HEAD_SIZE
|
||||
|
||||
if (IS_VAR_DATA_TYPE(type)) {
|
||||
memcpy(ptr, value, varDataTLen(value));
|
||||
if (isCopyValData) {
|
||||
memcpy(ptr, value, varDataTLen(value));
|
||||
}
|
||||
kvRowLen(row) += varDataTLen(value);
|
||||
} else {
|
||||
if (*offset == 0) {
|
||||
if (offset == 0) {
|
||||
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
|
||||
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
|
||||
memcpy(ptr, (void *)(&tvalue), TYPE_BYTES[type]);
|
||||
|
@ -494,7 +508,6 @@ static FORCE_INLINE int tdAppendKvColVal(SKVRow row, const void *value, int16_t
|
|||
}
|
||||
kvRowLen(row) += TYPE_BYTES[type];
|
||||
}
|
||||
*offset += sizeof(SColIdx);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
@ -589,12 +602,24 @@ typedef void *SMemRow;
|
|||
#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE)
|
||||
#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
|
||||
|
||||
#define SMEM_ROW_DATA 0U // SDataRow
|
||||
#define SMEM_ROW_KV 1U // SKVRow
|
||||
#define SMEM_ROW_DATA 0x0U // SDataRow
|
||||
#define SMEM_ROW_KV 0x01U // SKVRow
|
||||
#define SMEM_ROW_CONVERT 0x80U // SMemRow convert flag
|
||||
|
||||
#define memRowType(r) (*(uint8_t *)(r))
|
||||
#define KVRatioKV (0.2f) // all bool
|
||||
#define KVRatioPredict (0.4f)
|
||||
#define KVRatioData (0.75f) // all bigint
|
||||
#define KVRatioConvert (0.9f)
|
||||
|
||||
#define memRowType(r) ((*(uint8_t *)(r)) & 0x01)
|
||||
|
||||
#define memRowSetType(r, t) ((*(uint8_t *)(r)) = (t)) // set the total byte in case of dirty memory
|
||||
#define memRowSetConvert(r) ((*(uint8_t *)(r)) = (((*(uint8_t *)(r)) & 0x7F) | SMEM_ROW_CONVERT)) // highest bit
|
||||
#define isDataRowT(t) (SMEM_ROW_DATA == (((uint8_t)(t)) & 0x01))
|
||||
#define isDataRow(r) (SMEM_ROW_DATA == memRowType(r))
|
||||
#define isKvRowT(t) (SMEM_ROW_KV == (((uint8_t)(t)) & 0x01))
|
||||
#define isKvRow(r) (SMEM_ROW_KV == memRowType(r))
|
||||
#define isNeedConvertRow(r) (((*(uint8_t *)(r)) & 0x80) == SMEM_ROW_CONVERT)
|
||||
|
||||
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
|
||||
#define memRowKvBody(r) \
|
||||
|
@ -611,6 +636,14 @@ typedef void *SMemRow;
|
|||
#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r))
|
||||
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen
|
||||
|
||||
static FORCE_INLINE char *memRowEnd(SMemRow row) {
|
||||
if (isDataRow(row)) {
|
||||
return (char *)dataRowEnd(memRowDataBody(row));
|
||||
} else {
|
||||
return (char *)kvRowEnd(memRowKvBody(row));
|
||||
}
|
||||
}
|
||||
|
||||
#define memRowDataVersion(r) dataRowVersion(memRowDataBody(r))
|
||||
#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
|
||||
#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version
|
||||
|
@ -628,7 +661,6 @@ typedef void *SMemRow;
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define memRowSetType(r, t) (memRowType(r) = (t))
|
||||
#define memRowSetLen(r, l) (isDataRow(r) ? memRowDataLen(r) = (l) : memRowKvLen(r) = (l))
|
||||
#define memRowSetVersion(r, v) (isDataRow(r) ? dataRowSetVersion(memRowDataBody(r), v) : memRowSetKvVersion(r, v))
|
||||
#define memRowCpy(dst, r) memcpy((dst), (r), memRowTLen(r))
|
||||
|
@ -661,12 +693,12 @@ static FORCE_INLINE void *tdGetMemRowDataOfColEx(void *row, int16_t colId, int8_
|
|||
}
|
||||
}
|
||||
|
||||
static FORCE_INLINE int tdAppendMemColVal(SMemRow row, const void *value, int16_t colId, int8_t type, int32_t offset,
|
||||
int32_t *kvOffset) {
|
||||
static FORCE_INLINE int tdAppendMemRowColVal(SMemRow row, const void *value, bool isCopyVarData, int16_t colId,
|
||||
int8_t type, int32_t offset) {
|
||||
if (isDataRow(row)) {
|
||||
tdAppendColVal(memRowDataBody(row), value, type, offset);
|
||||
tdAppendDataColVal(memRowDataBody(row), value, isCopyVarData, type, offset);
|
||||
} else {
|
||||
tdAppendKvColVal(memRowKvBody(row), value, colId, type, kvOffset);
|
||||
tdAppendKvColVal(memRowKvBody(row), value, isCopyVarData, colId, type, offset);
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
@ -688,6 +720,30 @@ static FORCE_INLINE int32_t tdGetColAppendLen(uint8_t rowType, const void *value
|
|||
return len;
|
||||
}
|
||||
|
||||
/**
|
||||
* 1. calculate the delta of AllNullLen for SDataRow.
|
||||
* 2. calculate the real len for SKVRow.
|
||||
*/
|
||||
static FORCE_INLINE void tdGetColAppendDeltaLen(const void *value, int8_t colType, int32_t *dataLen, int32_t *kvLen) {
|
||||
switch (colType) {
|
||||
case TSDB_DATA_TYPE_BINARY: {
|
||||
int32_t varLen = varDataLen(value);
|
||||
*dataLen += (varLen - CHAR_BYTES);
|
||||
*kvLen += (varLen + sizeof(SColIdx));
|
||||
break;
|
||||
}
|
||||
case TSDB_DATA_TYPE_NCHAR: {
|
||||
int32_t varLen = varDataLen(value);
|
||||
*dataLen += (varLen - TSDB_NCHAR_SIZE);
|
||||
*kvLen += (varLen + sizeof(SColIdx));
|
||||
break;
|
||||
}
|
||||
default: {
|
||||
*kvLen += (TYPE_BYTES[colType] + sizeof(SColIdx));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
typedef struct {
|
||||
int16_t colId;
|
||||
|
@ -703,7 +759,7 @@ static FORCE_INLINE void setSColInfo(SColInfo* colInfo, int16_t colId, uint8_t c
|
|||
|
||||
SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSchema1, STSchema *pSchema2);
|
||||
|
||||
|
||||
#if 0
|
||||
// ----------------- Raw payload structure for row:
|
||||
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
|
||||
* | |<----------------- flen ------------->|<--- value part --->|
|
||||
|
@ -749,6 +805,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
|
|||
|
||||
static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); }
|
||||
|
||||
#endif
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
#endif
|
||||
|
|
|
@ -851,7 +851,8 @@ SMemRow mergeTwoMemRows(void *buffer, SMemRow row1, SMemRow row2, STSchema *pSch
|
|||
int16_t k;
|
||||
for (k = 0; k < nKvNCols; ++k) {
|
||||
SColInfo *pColInfo = taosArrayGet(stashRow, k);
|
||||
tdAppendKvColVal(kvRow, pColInfo->colVal, pColInfo->colId, pColInfo->colType, &toffset);
|
||||
tdAppendKvColVal(kvRow, pColInfo->colVal, true, pColInfo->colId, pColInfo->colType, toffset);
|
||||
toffset += sizeof(SColIdx);
|
||||
}
|
||||
ASSERT(kvLen == memRowTLen(tRow));
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue