payload refactor to support 4096

This commit is contained in:
Cary Xu 2021-07-11 14:44:13 +08:00
parent 1236681fa7
commit 1cd331a376
8 changed files with 365 additions and 235 deletions

View File

@ -40,9 +40,6 @@ extern "C" {
#define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \
(((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE))
#define KvRowNColsThresh 1 // default 1200
#define KVRowRatio 0.85 // for NonVarType, we get value from SDataRow directly, while needs readdressing for SKVRow
#pragma pack(push,1)
// this struct is transfered as binary, padding two bytes to avoid
// an 'uid' whose low bytes is 0xff being recoginized as NULL,

View File

@ -41,6 +41,9 @@ extern "C" {
// forward declaration
struct SSqlInfo;
#define KvRowNColsThresh 128 // default 128
#define KVRowRatio 0.9 // for NonVarType, we get value from SDataRow directly, while needs readdressing for SKVRow
typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows);
typedef struct SNewVgroupInfo {
@ -87,12 +90,18 @@ typedef struct SBoundColumn {
bool hasVal; // denote if current column has bound or not
int32_t offset; // all column offset value
} SBoundColumn;
typedef struct {
uint16_t schemaColIdx;
uint16_t boundIdx;
uint16_t finalIdx;
} SBoundIdxInfo;
typedef struct SParsedDataColInfo {
int16_t numOfCols;
int16_t numOfBound;
int32_t *boundedColumns;
SBoundColumn *cols;
bool isOrdered; // bounded columns
int16_t numOfCols;
int16_t numOfBound;
int32_t * boundedColumns; // bounded column idx according to schema
SBoundColumn * cols;
SBoundIdxInfo *colIdxInfo;
} SParsedDataColInfo;
typedef struct {
@ -107,9 +116,12 @@ typedef struct {
void * pDataBlock;
SSubmitBlk *pSubmitBlk;
uint16_t allNullLen;
} SMemRowBuilder;
typedef struct {
TDRowLenT allNullLen;
} SMemRowHelper;
typedef struct STableDataBlocks {
SName tableName;
int8_t tsSource; // where does the UNIX timestamp come from, server or client
@ -130,7 +142,7 @@ typedef struct STableDataBlocks {
uint32_t numOfAllocedParams;
uint32_t numOfParams;
SParamInfo * params;
SMemRowBuilder rowBuilder;
SMemRowHelper rowHelper;
} STableDataBlocks;
typedef struct {
@ -398,7 +410,7 @@ extern int tscRefId;
extern int tscNumOfObj; // number of existed sqlObj in current process.
extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo);
void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables);
int16_t getNewResColId(SSqlCmd* pCmd);
@ -406,4 +418,4 @@ int16_t getNewResColId(SSqlCmd* pCmd);
}
#endif
#endif
#endif

View File

@ -46,23 +46,22 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
char *str, char **end);
static FORCE_INLINE int32_t getExtendedRowSize(STableComInfo *tinfo) {
return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN * tinfo->numOfColumns;
return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_COL_HEAD_LEN * tinfo->numOfColumns;
}
int initSMemRowBuilder(SMemRowBuilder *pBuilder, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) {
int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) {
ASSERT(nCols > 0);
pBuilder->pSchema = pSSchema;
pBuilder->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta
if (pBuilder->allNullLen == 0) {
pHelper->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta
if (pHelper->allNullLen == 0) {
for (uint16_t i = 0; i < nCols; ++i) {
uint8_t type = pSSchema[i].type;
int32_t typeLen = TYPE_BYTES[type];
ASSERT(typeLen > 0);
pBuilder->allNullLen += typeLen;
pHelper->allNullLen += typeLen;
if (TSDB_DATA_TYPE_BINARY == type) {
pBuilder->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES);
pHelper->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES);
} else if (TSDB_DATA_TYPE_NCHAR == type) {
int len = sizeof(VarDataLenT) + TSDB_NCHAR_SIZE;
pBuilder->allNullLen += len;
pHelper->allNullLen += len;
}
}
}
@ -404,18 +403,19 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE uint16_t tsSetColumnValue(char *payload, int16_t columnId, uint8_t columnType, void *value,
uint16_t valueLen) {
static FORCE_INLINE TDRowLenT tsSetPayloadColValue(char *payloadStart, char *payload, int16_t columnId,
uint8_t columnType, const void *value, uint16_t valueLen, TDRowTLenT tOffset) {
payloadColSetId(payload, columnId);
payloadColSetType(payload, columnType);
memcpy(payloadColValue(payload), value, valueLen);
return PAYLOAD_ID_TYPE_LEN + valueLen;
memcpy(POINTER_SHIFT(payloadStart,tOffset), value, valueLen);
payloadSetTLen(payloadStart, payloadTLen(payloadStart) + valueLen);
return valueLen;
}
static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *primaryKeyStart, char *payload, char *msg,
char **str, bool primaryKey, int16_t timePrec, TDRowLenT *sizeAppend, bool *isColNull,
TDRowLenT *dataRowColDeltaLen, TDRowLenT *kvRowColLen) {
static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *payloadStart, char *primaryKeyStart,
char *payload, char *msg, char **str, bool primaryKey, int16_t timePrec,
TDRowTLenT tOffset, TDRowLenT *sizeAppend, TDRowLenT *dataRowColDeltaLen,
TDRowLenT *kvRowColLen) {
int64_t iv;
int32_t ret;
char * endptr = NULL;
@ -427,29 +427,30 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_BOOL), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
} else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &TRUE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &TRUE_VALUE,
TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &FALSE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &FALSE_VALUE,
TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
} else {
return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10);
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type,
((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type,
((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]);
} else {
return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z);
@ -460,7 +461,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_TINYINT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_TINYINT), TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
@ -470,8 +472,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
uint8_t tmpVal = (uint8_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]);
}
@ -479,7 +481,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_UTINYINT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_UTINYINT), TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
@ -489,8 +492,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
uint8_t tmpVal = (uint8_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]);
}
@ -498,8 +501,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) {
// *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_SMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
@ -509,8 +512,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
int16_t tmpVal = (int16_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]);
}
@ -518,7 +521,9 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend =
tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_USMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
@ -528,8 +533,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
uint16_t tmpVal = (uint16_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]);
}
@ -537,7 +542,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_INT), TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
@ -547,7 +553,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
int32_t tmpVal = (int32_t)iv;
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_INT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_INT]);
}
@ -555,7 +562,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_UINT), TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
@ -565,8 +573,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
uint32_t tmpVal = (uint32_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UINT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UINT]);
}
@ -574,7 +582,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_BIGINT), TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
@ -583,14 +592,16 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z);
}
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &iv,
TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]);
}
break;
case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_UBIGINT), TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset);
} else {
ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
@ -600,15 +611,16 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
uint64_t tmpVal = (uint64_t)iv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]);
}
break;
case TSDB_DATA_TYPE_FLOAT:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_FLOAT), TYPE_BYTES[TSDB_DATA_TYPE_FLOAT], tOffset);
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
@ -621,15 +633,16 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
}
float tmpVal = (float)dv;
*sizeAppend =
tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_FLOAT], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]);
}
break;
case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_DOUBLE), TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset);
} else {
double dv;
if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) {
@ -640,7 +653,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
return tscInvalidOperationMsg(msg, "illegal double data", pToken->z);
}
*sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &dv, TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]);
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &dv,
TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]);
}
break;
@ -648,21 +662,22 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
// setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
*isColNull = true;
payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type);
memcpy(POINTER_SHIFT(payloadStart, tOffset), tdGetNullVal(TSDB_DATA_TYPE_BINARY), VARSTR_HEADER_SIZE + CHAR_BYTES);
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + CHAR_BYTES);
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return tscInvalidOperationMsg(msg, "string data overflow", pToken->z);
}
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type);
varDataSetLen(payloadColValue(payload), pToken->n);
memcpy(varDataVal(payloadColValue(payload)), pToken->z, pToken->n);
*sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + pToken->n);
*dataRowColDeltaLen += (TDRowLenT)(pToken->n - sizeof(uint8_t));
varDataSetLen(POINTER_SHIFT(payloadStart,tOffset), pToken->n);
memcpy(varDataVal(POINTER_SHIFT(payloadStart,tOffset)), pToken->z, pToken->n);
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + pToken->n);
*dataRowColDeltaLen += (TDRowLenT)(pToken->n - CHAR_BYTES);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + pToken->n);
}
@ -670,22 +685,25 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
*isColNull = true;
payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type);
memcpy(POINTER_SHIFT(payloadStart,tOffset), tdGetNullVal(TSDB_DATA_TYPE_NCHAR), VARSTR_HEADER_SIZE + INT_BYTES);
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + INT_BYTES);
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
payloadColSetId(payload, pSchema->colId);
payloadColSetType(payload, pSchema->type);
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payloadColValue(payload)),
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(POINTER_SHIFT(payloadStart,tOffset)),
pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return tscInvalidOperationMsg(msg, buf, pToken->z);
}
varDataSetLen(payloadColValue(payload), output);
varDataSetLen(POINTER_SHIFT(payloadStart,tOffset), output);
*sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + output);
*sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + output);
*dataRowColDeltaLen += (TDRowLenT)(output - sizeof(uint32_t));
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + output);
}
@ -696,12 +714,13 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
if (primaryKey) {
// When building SKVRow primaryKey, we should not skip even with NULL value.
int64_t tmpVal = 0;
*sizeAppend = tsSetColumnValue(primaryKeyStart, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
*sizeAppend = tsSetPayloadColValue(payloadStart, primaryKeyStart, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
} else {
*isColNull = true;
*sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type,
tdGetNullVal(TSDB_DATA_TYPE_TIMESTAMP),
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset);
}
} else {
int64_t tmpVal;
@ -709,8 +728,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri
return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z);
}
*sizeAppend = tsSetColumnValue(primaryKey ? primaryKeyStart : payload, pSchema->colId, pSchema->type, &tmpVal,
TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
*sizeAppend = tsSetPayloadColValue(payloadStart, primaryKey ? primaryKeyStart : payload, pSchema->colId,
pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset);
*kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]);
}
@ -762,27 +781,31 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
int32_t index = 0;
SStrToken sToken = {0};
SMemRowBuilder *pBuilder = &pDataBlocks->rowBuilder;
char * payload = pDataBlocks->pData + pDataBlocks->size;
SMemRowHelper *pHelper = &pDataBlocks->rowHelper;
char * payload = pDataBlocks->pData + pDataBlocks->size;
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta);
// 1. set the parsed value from sql string
int32_t rowSize = 0;
uint16_t rowSizeAppended = 0;
uint16_t nColsNotNull = 0;
TDRowLenT dataRowLen = pBuilder->allNullLen;
TDRowLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE;
int32_t dataRowLen = pHelper->allNullLen;
int32_t kvRowLen = TD_MEM_ROW_KV_VER_SIZE;
TDRowTLenT payloadValOffset = 0;
TDRowLenT colValOffset = 0;
ASSERT(dataRowLen > 0);
payloadSetNCols(payload, spd->numOfBound);
payloadValOffset = payloadValuesOffset(payload); // rely on payloadNCols
payloadSetTLen(payload, payloadValOffset);
char *kvPrimayKeyStart = payload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple
char *kvStart = kvPrimayKeyStart + PAYLOAD_PRIMARY_COL_LEN; // the column tuple behind the primaryKey
char *kvPrimaryKeyStart = payload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple
char *kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey
// 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) {
// the start position in data block buffer of current value in sql
int32_t colIndex = spd->boundedColumns[i]; // ordered
int32_t colIndex = spd->boundedColumns[i];
char *start = payload + spd->cols[colIndex].offset;
@ -845,43 +868,61 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i
}
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX);
bool isColNull = false;
TDRowLenT dataRowDeltaColLen = 0; // When combine the data as SDataRow, the delta len between all NULL columns.
TDRowLenT kvRowColLen = 0;
TDRowLenT colSizeAppended = 0;
// make sure the Primarykey locates in the 1st column
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, kvPrimayKeyStart, kvStart, pInsertParam->msg, str, isPrimaryKey,
timePrec, &colSizeAppended, &isColNull, &dataRowDeltaColLen, &kvRowColLen);
TDRowLenT colValAppended = 0;
if(!spd->isOrdered) {
ASSERT(spd->colIdxInfo != NULL);
if(!isPrimaryKey) {
kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN);
} else {
ASSERT(spd->colIdxInfo[i].finalIdx == 0);
}
} else {
ASSERT(spd->colIdxInfo == NULL);
}
// the primary key locates in 1st column
int32_t ret = tsParseOneColumnKV(pSchema, &sToken, payload, kvPrimaryKeyStart, kvStart, pInsertParam->msg, str,
isPrimaryKey, timePrec, payloadValOffset + colValOffset, &colValAppended,
&dataRowDeltaColLen, &kvRowColLen);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, payloadColValue(kvPrimayKeyStart)) != TSDB_CODE_SUCCESS) {
tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
if (isColNull == false) {
++nColsNotNull;
if (isPrimaryKey) {
if (tsCheckTimestamp(pDataBlocks, payloadValues(payload)) != TSDB_CODE_SUCCESS) {
tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z);
return TSDB_CODE_TSC_INVALID_TIME_STAMP;
}
payloadColSetOffset(kvPrimaryKeyStart, colValOffset);
} else {
payloadColSetOffset(kvStart, colValOffset);
if(spd->isOrdered) {
kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column
}
}
colValOffset += colValAppended;
kvRowLen += kvRowColLen;
dataRowLen += dataRowDeltaColLen;
if (!isPrimaryKey) {
kvStart += colSizeAppended; // move to next column
}
rowSizeAppended += colSizeAppended; // calculate rowLen
}
if (kvRowLen < dataRowLen) {
if (kvRowLen < dataRowLen * KVRowRatio) {
payloadSetType(payload, SMEM_ROW_KV);
} else {
payloadSetType(payload, SMEM_ROW_DATA);
}
*len = PAYLOAD_HEADER_LEN + rowSizeAppended;
payloadSetNCols(payload, nColsNotNull);
ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW);
*len = (int32_t)(payloadValOffset + colValOffset);
payloadSetTLen(payload, *len);
// TSKEY tsKey = payloadKey(payload);
// ASSERT((tsKey < 1627747200000000 && tsKey > 1498838400000000) || (tsKey < 1627747200000 && tsKey > 1498838400000) ||
// (tsKey < 1627747200 && tsKey > 1498838400));
return TSDB_CODE_SUCCESS;
}
@ -895,6 +936,27 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) {
return left > right ? 1 : -1;
}
}
static int32_t schemaIdxCompar(const void *lhs, const void *rhs) {
uint16_t left = *(uint16_t *)lhs;
uint16_t right = *(uint16_t *)rhs;
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
static int32_t boundIdxCompar(const void *lhs, const void *rhs) {
uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t));
uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t));
if (left == right) {
return 0;
} else {
return left > right ? 1 : -1;
}
}
int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SInsertStatementParam *pInsertParam,
int32_t* numOfRows, char *tmpTokenBuf) {
@ -912,8 +974,8 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
int32_t extendedRowSize = getExtendedRowSize(&tinfo);
initSMemRowBuilder(&pDataBlock->rowBuilder, tscGetTableSchema(pDataBlock->pTableMeta),
tscGetNumOfColumns(pDataBlock->pTableMeta), 0);
initSMemRowHelper(&pDataBlock->rowHelper, tscGetTableSchema(pDataBlock->pTableMeta),
tscGetNumOfColumns(pDataBlock->pTableMeta), 0);
while (1) {
index = 0;
@ -965,9 +1027,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn
void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) {
pColInfo->numOfCols = numOfCols;
pColInfo->numOfBound = numOfCols;
pColInfo->isOrdered = true;
pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t));
pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn));
pColInfo->colIdxInfo = NULL;
for (int32_t i = 0; i < pColInfo->numOfCols; ++i) {
if (i > 0) {
@ -1093,14 +1156,23 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
char * pBlockData = pBlocks->data;
uint32_t totolPayloadLen = 0;
TDRowLenT payloadTLen = 0;
TDRowTLenT totolPayloadTLen = 0;
TDRowTLenT payloadTLen = 0;
int n = 0;
while (n < nRows) {
pBlkKeyTuple->skey = payloadKey(pBlockData);
pBlkKeyTuple->payloadAddr = pBlockData;
payloadTLen = payloadTLen(pBlockData);
totolPayloadLen += payloadTLen;
ASSERT(payloadNCols(pBlockData) <= 4096);
ASSERT(payloadTLen(pBlockData) < 65536);
ASSERT(pBlkKeyTuple->payloadAddr != NULL);
ASSERT((pBlkKeyTuple->skey < 1627747200000000 && pBlkKeyTuple->skey > 1498838400000000) ||
(pBlkKeyTuple->skey < 1627747200000 && pBlkKeyTuple->skey > 1498838400000) ||
(pBlkKeyTuple->skey < 1627747200 && pBlkKeyTuple->skey > 1498838400));
totolPayloadTLen += payloadTLen;
// next loop
pBlockData += payloadTLen;
++pBlkKeyTuple;
@ -1119,23 +1191,36 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk
TSKEY tj = (pBlkKeyTuple + j)->skey;
if (ti == tj) {
totolPayloadLen -= payloadTLen(pBlkKeyTuple + j);
totolPayloadTLen -= payloadTLen(pBlkKeyTuple + j);
++j;
continue;
}
int32_t nextPos = (++i);
if (nextPos != j) {
memmove(pBlkKeyTuple + sizeof(SBlockKeyTuple) * nextPos, pBlkKeyTuple + sizeof(SBlockKeyTuple) * j, sizeof(SBlockKeyTuple));
memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple));
}
++j;
}
dataBuf->ordered = true;
pBlocks->numOfRows = i + 1;
ASSERT(pBlocks->numOfRows <= nRows);
int tt = 0;
pBlkKeyTuple = pBlkKeyInfo->pKeyTuple;
while (tt < pBlocks->numOfRows) {
ASSERT(pBlkKeyTuple->payloadAddr != NULL);
ASSERT((pBlkKeyTuple->skey < 1627747200000000 && pBlkKeyTuple->skey > 1498838400000000) ||
(pBlkKeyTuple->skey < 1627747200000 && pBlkKeyTuple->skey > 1498838400000) ||
(pBlkKeyTuple->skey < 1627747200 && pBlkKeyTuple->skey > 1498838400));
++pBlkKeyTuple;
++tt;
}
}
dataBuf->size = sizeof(SSubmitBlk) + totolPayloadLen;
dataBuf->size = sizeof(SSubmitBlk) + totolPayloadTLen;
dataBuf->prevTS = INT64_MIN;
return 0;
@ -1475,7 +1560,6 @@ static int32_t validateDataSource(SInsertStatementParam *pInsertParam, int32_t t
static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo* pColInfo, SSchema* pSchema,
char* str, char **end) {
pColInfo->numOfBound = 0;
memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * pColInfo->numOfCols);
for(int32_t i = 0; i < pColInfo->numOfCols; ++i) {
pColInfo->cols[i].hasVal = false;
@ -1483,7 +1567,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
int32_t code = TSDB_CODE_SUCCESS;
int32_t index = 0;
int32_t index = 0;
SStrToken sToken = tStrGetToken(str, &index, false);
str += index;
@ -1491,7 +1575,8 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
code = tscSQLSyntaxErrMsg(pInsertParam->msg, "( is expected", sToken.z);
goto _clean;
}
bool isOrdered = true;
int32_t lastColIdx = -1;
while (1) {
index = 0;
sToken = tStrGetToken(str, &index, false);
@ -1523,6 +1608,14 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
pColInfo->boundedColumns[pColInfo->numOfBound] = t;
pColInfo->numOfBound += 1;
findColumnIndex = true;
if (isOrdered) {
if (lastColIdx > t) {
isOrdered = false;
} else {
lastColIdx = t;
}
}
break;
}
}
@ -1533,10 +1626,32 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat
}
}
memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0 , sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound));
pColInfo->isOrdered = isOrdered;
if (!isOrdered) {
pColInfo->colIdxInfo = calloc(pColInfo->numOfBound, sizeof(SBoundIdxInfo));
if (pColInfo->colIdxInfo == NULL) {
code = TSDB_CODE_TSC_OUT_OF_MEMORY;
goto _clean;
}
SBoundIdxInfo *pColIdx = pColInfo->colIdxInfo;
for (int i = 0; i < pColInfo->numOfBound; ++i) {
pColIdx[i].schemaColIdx = (uint16_t)pColInfo->boundedColumns[i];
pColIdx[i].boundIdx = (uint16_t)i;
}
qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar);
for (int i = 0; i < pColInfo->numOfBound; ++i) {
pColIdx[i].finalIdx = (uint16_t)i;
}
qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar);
}
memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0,
sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound));
return TSDB_CODE_SUCCESS;
_clean:
_clean:
pInsertParam->sql = NULL;
return code;
}
@ -1944,8 +2059,8 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow
goto _error;
}
initSMemRowBuilder(&pTableDataBlock->rowBuilder, tscGetTableSchema(pTableDataBlock->pTableMeta),
tscGetNumOfColumns(pTableDataBlock->pTableMeta), 0);
initSMemRowHelper(&pTableDataBlock->rowHelper, tscGetTableSchema(pTableDataBlock->pTableMeta),
tscGetNumOfColumns(pTableDataBlock->pTableMeta), 0);
while ((readLen = tgetline(&line, &n, fp)) != -1) {
if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) {

View File

@ -1433,6 +1433,7 @@ void tscFreeSqlObj(SSqlObj* pSql) {
void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) {
tfree(pColInfo->boundedColumns);
tfree(pColInfo->cols);
tfree(pColInfo->colIdxInfo);
}
void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) {
@ -1646,51 +1647,54 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i
return TSDB_CODE_SUCCESS;
}
static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
SSchema* pSchema = pBuilder->pSchema;
char* p = (char*)pBuilder->buf;
int toffset = 0;
uint16_t nCols = pBuilder->nCols;
// RawRow payload structure:
// |<---------- header ------------->|<------- column data array ------->|
// |SMemRowType| dataLen | nCols | colId | colType | value |...|...|
// +-----------+----------+----------+---------------------------------->|
// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...|
// +-----------+----------+----------+---------------------------------->|
uint8_t memRowType = payloadType(p);
uint16_t nColsNotNull = payloadNCols(p);
if (pBuilder->nCols <= 0 || nColsNotNull <= 0) {
uint16_t nColsBound = payloadNCols(p);
if (pBuilder->nCols <= 0 || nColsBound <= 0) {
return NULL;
}
ASSERT(nColsNotNull <= nCols);
char* pVals = POINTER_SHIFT(p, payloadValuesOffset(p));
SMemRow* memRow = (SMemRow)pBuilder->pDataBlock;
memRowSetType(memRow, memRowType);
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
* |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... |
* +-----------+----------+----------+--------------------------------------|--------------------|
* | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... |
* +-----------+----------+----------+--------------------------------------+--------------------|
* 1. offset in column data tuple starts from the value part in case of uint16_t overflow.
* 2. dataTLen: total length including the header and body.
*/
if (memRowType == SMEM_ROW_DATA) {
ASSERT(nColsBound <= nCols);
SDataRow trow = (SDataRow)memRowDataBody(memRow);
dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetLen(trow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen));
dataRowSetVersion(trow, pBuilder->sversion);
p = (char*)payloadBody(pBuilder->buf);
uint16_t i = 0, j = 0;
while (j < pBuilder->nCols) {
if (i >= nColsNotNull) {
while (j < nCols) {
if (i >= nColsBound) {
break;
}
int16_t colId = *(int16_t*)p;
int16_t colId = payloadColId(p);
if (colId == pSchema[j].colId) {
tdAppendColVal(trow, payloadColValue(p), pSchema[j].type, toffset);
// ASSERT(payloadColType(p) == pSchema[j].type);
tdAppendColVal(trow, POINTER_SHIFT(pVals, payloadColOffset(p)), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
p = skipToNextEles(p);
p = payloadNextCol(p);
++i;
++j;
} else if (colId < pSchema[j].colId) {
p = skipToNextEles(p);
p = payloadNextCol(p);
++i;
} else {
tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset);
@ -1699,41 +1703,43 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) {
}
}
while (j < pBuilder->nCols) {
while (j < nCols) {
tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset);
toffset += TYPE_BYTES[pSchema[j].type];
++j;
}
while (i < nColsNotNull) {
p = skipToNextEles(p);
#if 0 // no need anymore
while (i < nColsBound) {
p = payloadNextCol(p);
++i;
}
#endif
} else if (memRowType == SMEM_ROW_KV) {
ASSERT(nColsNotNull <= pBuilder->nCols);
ASSERT(nColsBound <= nCols);
SKVRow kvRow = (SKVRow)memRowKvBody(memRow);
uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull;
kvRowSetLen(kvRow, tlen);
kvRowSetNCols(kvRow, nColsNotNull);
memRowKvSetVersion(memRow, pBuilder->sversion);
kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsBound));
kvRowSetNCols(kvRow, nColsBound);
memRowSetKvVersion(memRow, pBuilder->sversion);
p = (char*)payloadBody(pBuilder->buf);
int i = 0;
while (i < nColsNotNull) {
while (i < nColsBound) {
int16_t colId = payloadColId(p);
uint8_t colType = payloadColType(p);
tdAppendKvColVal(kvRow, payloadColValue(p), colId, colType, toffset);
tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, toffset);
toffset += sizeof(SColIdx);
p = skipToNextEles(p);
p = payloadNextCol(p);
++i;
}
} else {
ASSERT(0);
}
pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + memRowTLen(memRow); // next row
pBuilder->pSubmitBlk->dataLen += memRowTLen(memRow);
int32_t rowTLen = memRowTLen(memRow);
pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + rowTLen; // next row
pBuilder->pSubmitBlk->dataLen += rowTLen;
return memRow;
}
@ -1744,7 +1750,6 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
STableMeta* pTableMeta = pTableDataBlock->pTableMeta;
STableComInfo tinfo = tscGetTableInfo(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
SMemRowBuilder* pBuilder = &pTableDataBlock->rowBuilder;
SSubmitBlk* pBlock = pDataBlock;
memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk));
@ -1780,18 +1785,19 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
pBlock->dataLen = 0;
int32_t numOfRows = htons(pBlock->numOfRows);
pBuilder->pSchema = pSchema;
pBuilder->sversion = pTableMeta->sversion;
pBuilder->flen = flen;
pBuilder->nCols = tinfo.numOfColumns;
pBuilder->pDataBlock = pDataBlock;
pBuilder->pSubmitBlk = pBlock;
pBuilder->buf = p;
pBuilder->size = 0;
SMemRowBuilder rowBuilder;
rowBuilder.pSchema = pSchema;
rowBuilder.sversion = pTableMeta->sversion;
rowBuilder.flen = flen;
rowBuilder.nCols = tinfo.numOfColumns;
rowBuilder.pDataBlock = pDataBlock;
rowBuilder.pSubmitBlk = pBlock;
rowBuilder.buf = p;
rowBuilder.size = 0;
for (int32_t i = 0; i < numOfRows; ++i) {
pBuilder->buf = (blkKeyTuple+i)->payloadAddr;
tdGenMemRowFromBuilder(pBuilder);
rowBuilder.buf = (blkKeyTuple + i)->payloadAddr;
tdGenMemRowFromBuilder(&rowBuilder);
}
int32_t len = pBlock->dataLen + pBlock->schemaLen;
@ -1803,7 +1809,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo
static int32_t getRowExpandSize(STableMeta* pTableMeta) {
// add prefix len of KV type SMemRow(we may use SDataRow or SKVRow)
int32_t result = TD_DATA_ROW_HEAD_SIZE + TD_MEM_ROW_KV_TYPE_VER_SIZE;
int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t columns = tscGetNumOfColumns(pTableMeta);
SSchema* pSchema = tscGetTableSchema(pTableMeta);
for(int32_t i = 0; i < columns; i++) {

View File

@ -186,18 +186,6 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) {
return 0;
}
}
/* A memory data row, the format is like below:
*|---------+---------------------+--------------------------- len ---------------------------------->|
*|<- type->|<-- Head -->|<--------- flen -------------->| |
*|---------+---------------------+---------------------------------+---------------------------------+
*| uint8_t | uint16_t | int16_t | | |
*|---------+----------+----------+---------------------------------+---------------------------------+
*| flag | len | sversion | First part | Second part |
*|---------+----------+----------+---------------------------------+---------------------------------+
*
* NOTE: timestamp in this row structure is TKEY instead of TSKEY
*/
typedef void *SMemRow;
// ----------------- Data row structure
@ -216,7 +204,7 @@ typedef void *SDataRow;
#define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define dataRowLen(r) (*(uint16_t *)(r))
#define dataRowLen(r) (*(TDRowLenT *)(r)) // 0~65535
#define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t)))
#define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE)
#define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r)))
@ -231,7 +219,6 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema);
void tdFreeDataRow(SDataRow row);
void tdInitDataRow(SDataRow row, STSchema *pSchema);
SDataRow tdDataRowDup(SDataRow row);
SMemRow tdMemRowDup(SMemRow row);
// offset here not include dataRow header length
static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) {
@ -247,7 +234,7 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t
if (offset == 0) {
ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP);
TKEY tvalue = tdGetTKEY(*(TSKEY *)value);
memcpy(POINTER_SHIFT(row, toffset), (void *)(&tvalue), TYPE_BYTES[type]);
memcpy(POINTER_SHIFT(row, toffset), (const void *)(&tvalue), TYPE_BYTES[type]);
} else {
memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]);
}
@ -287,7 +274,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle);
bool isNEleNull(SDataCol *pCol, int nEle);
void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints);
static FORCE_INLINE const void *tdGetNullVal(int8_t type) {
FORCE_INLINE const void *tdGetNullVal(int8_t type) {
switch (type) {
case TSDB_DATA_TYPE_BOOL:
return &BoolNull;
@ -400,11 +387,11 @@ void tdResetDataCols(SDataCols *pCols);
int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData);
SDataCols *tdFreeDataCols(SDataCols *pCols);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols);
int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset);
// ----------------- K-V data row structure
/*
/* |<-------------------------------------- len -------------------------------------------->|
* |<----- header ----->|<--------------------------- body -------------------------------->|
* +----------+----------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
@ -420,7 +407,7 @@ typedef struct {
#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define kvRowLen(r) (*(uint16_t *)(r))
#define kvRowLen(r) (*(TDRowLenT *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
@ -532,7 +519,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
// ----------------- SMemRow appended with sequential data row structure
/*
* |-------------------------------+--------------------------- len ---------------------------------->|
* |---------|------------------------------------------------- len ---------------------------------->|
* |<-------- Head ------>|<--------- flen -------------->| |
* |---------+---------------------+---------------------------------+---------------------------------+
* | uint8_t | uint16_t | int16_t | | |
@ -544,7 +531,8 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
*/
// ----------------- SMemRow appended with extended K-V data row structure
/* |
/* |--------------------|------------------------------------------------ len ---------------------------------->|
* |<------------- Head ------------>|<--------- flen -------------->| |
* |--------------------+----------+--------------------------------------------+---------------------------------+
* | uint8_t | int16_t | uint16_t | int16_t | | |
* |---------+----------+----------+----------+---------------------------------+---------------------------------+
@ -552,11 +540,13 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
* |---------+----------+----------+----------+---------------------------------+---------------------------------+
*/
typedef void *SMemRow;
#define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t)
#define TD_MEM_ROW_KV_VER_SIZE sizeof(int16_t)
#define TD_MEM_ROW_KV_TYPE_VER_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE)
#define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE)
#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
// #define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE)
#define SMEM_ROW_DATA 0U // SDataRow
#define SMEM_ROW_KV 1U // SKVRow
@ -567,21 +557,22 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
#define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag
#define memRowKvBody(r) \
POINTER_SHIFT(r, TD_MEM_ROW_KV_TYPE_VER_SIZE) // section after flag + sversion as to reuse of SKVRow
// #define memRowBody(r) (isDataRow(r) ? memRowDataBody(r) : memRowKvBody(r))
POINTER_SHIFT(r, TD_MEM_ROW_KV_TYPE_VER_SIZE) // section after flag + sversion as to reuse SKVRow
#define memRowDataLen(r) (*(TDRowLenT *)memRowDataBody(r))
#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r))
#define memRowDataTLen(r) (memRowDataLen(r) + (TDRowLenT)TD_MEM_ROW_TYPE_SIZE)
#define memRowKvTLen(r) (memRowKvLen(r) + (TDRowLenT)TD_MEM_ROW_KV_TYPE_VER_SIZE)
#define memRowDataLen(r) (*(TDRowLenT *)memRowDataBody(r)) // 0~65535
#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r)) // 0~65535
#define memRowDataTLen(r) (memRowDataLen(r) + TD_MEM_ROW_TYPE_SIZE) // using uint32_t/int32_t to store the TLen
#define memRowKvTLen(r) (memRowKvLen(r) + TD_MEM_ROW_KV_TYPE_VER_SIZE)
#define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r))
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r))
#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen
#define memRowDataVersion(r) dataRowVersion(memRowDataBody(r))
#define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE))
#define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version
#define memRowKvSetVersion(r, v) (memRowKvVersion(r) = (v))
#define memRowSetKvVersion(r, v) (memRowKvVersion(r) = (v))
#define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowDataBody(r)) : kvRowValues(memRowKvBody(r)))
#define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowDataBody(r)) : kvRowTKey(memRowKvBody(r)))
@ -594,6 +585,8 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId,
#define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_DATA_HEAD_SIZE)
#define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r))
SMemRow tdMemRowDup(SMemRow row);
void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols);
// NOTE: offset here including the header size
static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); }
// NOTE: offset here including the header size
@ -608,50 +601,52 @@ static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t o
return NULL;
}
// RawRow payload structure:
// |<---------- header ------------->|<---- body: column data tuple ---->|
// |SMemRowType| dataLen | nCols | colId | colType | value |...|...|
// +-----------+----------+----------+---------------------------------->|
// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...|
// +-----------+----------+----------+---------------------------------->|
// ----------------- Raw payload structure for row:
/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->|
* | |<----------------- flen ------------->|<--- value part --->|
* |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... |
* +-----------+----------+----------+--------------------------------------|--------------------|
* | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... |
* +-----------+----------+----------+--------------------------------------+--------------------|
* 1. offset in column data tuple starts from the value part in case of uint16_t overflow.
* 2. dataTLen: total length including the header and body.
*/
#define PAYLOAD_NCOLS_LEN sizeof(uint16_t)
#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT))
#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowTLenT))
#define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN)
#define PAYLOAD_ID_LEN sizeof(int16_t)
#define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t))
#define PAYLOAD_COL_HEAD_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(uint16_t))
#define PAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY))
#define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN)
#define payloadType(r) (*(uint8_t *)(r))
#define payloadSetType(r, t) (payloadType(r) = (t))
#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header
#define payloadTLen(r) (*(TDRowTLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header
#define payloadSetTLen(r, l) (payloadTLen(r) = (l))
#define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET))
#define payloadSetNCols(r, n) (payloadNCols(r) = (n))
#define payloadValuesOffset(r) \
(PAYLOAD_HEADER_LEN + payloadNCols(r) * PAYLOAD_COL_HEAD_LEN) // avoid using the macro in loop
#define payloadValues(r) POINTER_SHIFT(r, payloadValuesOffset(r)) // avoid using the macro in loop
#define payloadColId(c) (*(int16_t *)(c))
#define payloadColType(c) (*(uint8_t *)POINTER_SHIFT(c, PAYLOAD_ID_LEN))
#define payloadColOffset(c) (*(uint16_t *)POINTER_SHIFT(c, PAYLOAD_ID_TYPE_LEN))
#define payloadColValue(c) POINTER_SHIFT(c, payloadColOffset(c))
#define payloadColId(r) (*(int16_t *)(r))
#define payloadColType(r) (*(uint8_t *)POINTER_SHIFT(r, PAYLOAD_ID_LEN))
#define payloadColValue(r) POINTER_SHIFT(r, PAYLOAD_ID_TYPE_LEN)
#define payloadColSetId(c, i) (payloadColId(c) = (i))
#define payloadColSetType(c, t) (payloadColType(c) = (t))
#define payloadColSetOffset(c, o) (payloadColOffset(c) = (o))
#define payloadColSetId(r, i) (payloadColId(r) = (i))
#define payloadColSetType(r, t) (payloadColType(r) = (t))
#define payloadKeyAddr(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN)
#define payloadTKey(r) (*(TKEY *)(payloadKeyAddr(r)))
#define payloadKeyOffset(r) (*(uint16_t *)POINTER_SHIFT(r, PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN))
#define payloadTKey(r) (*(TKEY *)POINTER_SHIFT(r, payloadValuesOffset(r) + payloadKeyOffset(r)))
#define payloadKey(r) tdGetKey(payloadTKey(r))
static FORCE_INLINE char *skipToNextEles(char *p) {
uint8_t colType = payloadColType(p);
if (IS_VAR_DATA_TYPE(colType)) {
return (char *)POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + varDataTLen(payloadColValue(p)));
} else {
return (char *)POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + TYPE_BYTES[colType]);
}
}
static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); }
#ifdef __cplusplus
}
#endif
#endif // _TD_DATA_FORMAT_H_
#endif // _TD_DATA_FORMAT_H_

View File

@ -195,7 +195,11 @@ do { \
#define TSDB_APPNAME_LEN TSDB_UNI_LEN
#define TSDB_MAX_BYTES_PER_ROW 65536
/**
* Don't change to 65536. As in some scenarios uint16_t (0~65535) is used to store the row len.
* Finally, we use 65531(65535 - 4), as the SDataRow and SKVRow including 4 bits header.
*/
#define TSDB_MAX_BYTES_PER_ROW 65531
#define TSDB_MAX_TAGS_LEN 16384
#define TSDB_MAX_TAGS 128
#define TSDB_MAX_TAG_CONDITIONS 1024

View File

@ -10,9 +10,10 @@ extern "C" {
#include "taosdef.h"
// ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR
typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT; // maxVarDataLen: 32767
typedef uint16_t TDRowLenT;
typedef int32_t VarDataOffsetT;
typedef int16_t VarDataLenT; // maxVarDataLen: 32767
typedef uint16_t TDRowLenT; // not including overhead: 0 ~ 65535
typedef uint32_t TDRowTLenT; // total length, including overhead
typedef struct tstr {
VarDataLenT len;

View File

@ -767,7 +767,7 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void
void *pRow = tsdbAllocBytes(pRepo, memRowTLen(row));
if (pRow == NULL) {
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s",
tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %" PRIu64 " bytes since %s",
REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), memRowTLen(row), tstrerror(terrno));
return -1;
}