From 1cd331a37618388b8efa1be7822736963c97cf28 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Sun, 11 Jul 2021 14:44:13 +0800 Subject: [PATCH] payload refactor to support 4096 --- src/client/inc/tscUtil.h | 3 - src/client/inc/tsclient.h | 30 ++- src/client/src/tscParseInsert.c | 349 +++++++++++++++++++++----------- src/client/src/tscUtil.c | 98 ++++----- src/common/inc/tdataformat.h | 105 +++++----- src/inc/taosdef.h | 6 +- src/inc/ttype.h | 7 +- src/tsdb/src/tsdbMemTable.c | 2 +- 8 files changed, 365 insertions(+), 235 deletions(-) diff --git a/src/client/inc/tscUtil.h b/src/client/inc/tscUtil.h index c962a1ef42..f1a7af01a5 100644 --- a/src/client/inc/tscUtil.h +++ b/src/client/inc/tscUtil.h @@ -40,9 +40,6 @@ extern "C" { #define UTIL_TABLE_IS_TMP_TABLE(metaInfo) \ (((metaInfo)->pTableMeta != NULL) && ((metaInfo)->pTableMeta->tableType == TSDB_TEMP_TABLE)) -#define KvRowNColsThresh 1 // default 1200 -#define KVRowRatio 0.85 // for NonVarType, we get value from SDataRow directly, while needs readdressing for SKVRow - #pragma pack(push,1) // this struct is transfered as binary, padding two bytes to avoid // an 'uid' whose low bytes is 0xff being recoginized as NULL, diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 3771dbe575..c9428504b8 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -41,6 +41,9 @@ extern "C" { // forward declaration struct SSqlInfo; +#define KvRowNColsThresh 128 // default 128 +#define KVRowRatio 0.9 // for NonVarType, we get value from SDataRow directly, while needs readdressing for SKVRow + typedef void (*__async_cb_func_t)(void *param, TAOS_RES *tres, int32_t numOfRows); typedef struct SNewVgroupInfo { @@ -87,12 +90,18 @@ typedef struct SBoundColumn { bool hasVal; // denote if current column has bound or not int32_t offset; // all column offset value } SBoundColumn; - +typedef struct { + uint16_t schemaColIdx; + uint16_t boundIdx; + uint16_t finalIdx; +} SBoundIdxInfo; typedef struct SParsedDataColInfo { - int16_t numOfCols; - int16_t numOfBound; - int32_t *boundedColumns; - SBoundColumn *cols; + bool isOrdered; // bounded columns + int16_t numOfCols; + int16_t numOfBound; + int32_t * boundedColumns; // bounded column idx according to schema + SBoundColumn * cols; + SBoundIdxInfo *colIdxInfo; } SParsedDataColInfo; typedef struct { @@ -107,9 +116,12 @@ typedef struct { void * pDataBlock; SSubmitBlk *pSubmitBlk; - uint16_t allNullLen; } SMemRowBuilder; +typedef struct { + TDRowLenT allNullLen; +} SMemRowHelper; + typedef struct STableDataBlocks { SName tableName; int8_t tsSource; // where does the UNIX timestamp come from, server or client @@ -130,7 +142,7 @@ typedef struct STableDataBlocks { uint32_t numOfAllocedParams; uint32_t numOfParams; SParamInfo * params; - SMemRowBuilder rowBuilder; + SMemRowHelper rowHelper; } STableDataBlocks; typedef struct { @@ -398,7 +410,7 @@ extern int tscRefId; extern int tscNumOfObj; // number of existed sqlObj in current process. extern int (*tscBuildMsg[TSDB_SQL_MAX])(SSqlObj *pSql, SSqlInfo *pInfo); - + void tscBuildVgroupTableInfo(SSqlObj* pSql, STableMetaInfo* pTableMetaInfo, SArray* tables); int16_t getNewResColId(SSqlCmd* pCmd); @@ -406,4 +418,4 @@ int16_t getNewResColId(SSqlCmd* pCmd); } #endif -#endif +#endif \ No newline at end of file diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index dd5980053c..f0a9421f30 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -46,23 +46,22 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat char *str, char **end); static FORCE_INLINE int32_t getExtendedRowSize(STableComInfo *tinfo) { - return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN * tinfo->numOfColumns; + return tinfo->rowSize + PAYLOAD_HEADER_LEN + PAYLOAD_COL_HEAD_LEN * tinfo->numOfColumns; } -int initSMemRowBuilder(SMemRowBuilder *pBuilder, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) { +int initSMemRowHelper(SMemRowHelper *pHelper, SSchema *pSSchema, uint16_t nCols, uint16_t allNullColsLen) { ASSERT(nCols > 0); - pBuilder->pSchema = pSSchema; - pBuilder->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta - if (pBuilder->allNullLen == 0) { + pHelper->allNullLen = allNullColsLen; // TODO: get allNullColsLen when creating or altering table meta + if (pHelper->allNullLen == 0) { for (uint16_t i = 0; i < nCols; ++i) { uint8_t type = pSSchema[i].type; int32_t typeLen = TYPE_BYTES[type]; ASSERT(typeLen > 0); - pBuilder->allNullLen += typeLen; + pHelper->allNullLen += typeLen; if (TSDB_DATA_TYPE_BINARY == type) { - pBuilder->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES); + pHelper->allNullLen += (sizeof(VarDataLenT) + CHAR_BYTES); } else if (TSDB_DATA_TYPE_NCHAR == type) { int len = sizeof(VarDataLenT) + TSDB_NCHAR_SIZE; - pBuilder->allNullLen += len; + pHelper->allNullLen += len; } } } @@ -404,18 +403,19 @@ int32_t tsParseOneColumn(SSchema *pSchema, SStrToken *pToken, char *payload, cha return TSDB_CODE_SUCCESS; } -static FORCE_INLINE uint16_t tsSetColumnValue(char *payload, int16_t columnId, uint8_t columnType, void *value, - uint16_t valueLen) { +static FORCE_INLINE TDRowLenT tsSetPayloadColValue(char *payloadStart, char *payload, int16_t columnId, + uint8_t columnType, const void *value, uint16_t valueLen, TDRowTLenT tOffset) { payloadColSetId(payload, columnId); payloadColSetType(payload, columnType); - - memcpy(payloadColValue(payload), value, valueLen); - return PAYLOAD_ID_TYPE_LEN + valueLen; + memcpy(POINTER_SHIFT(payloadStart,tOffset), value, valueLen); + payloadSetTLen(payloadStart, payloadTLen(payloadStart) + valueLen); + return valueLen; } -static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *primaryKeyStart, char *payload, char *msg, - char **str, bool primaryKey, int16_t timePrec, TDRowLenT *sizeAppend, bool *isColNull, - TDRowLenT *dataRowColDeltaLen, TDRowLenT *kvRowColLen) { +static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *payloadStart, char *primaryKeyStart, + char *payload, char *msg, char **str, bool primaryKey, int16_t timePrec, + TDRowTLenT tOffset, TDRowLenT *sizeAppend, TDRowLenT *dataRowColDeltaLen, + TDRowLenT *kvRowColLen) { int64_t iv; int32_t ret; char * endptr = NULL; @@ -427,29 +427,30 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri switch (pSchema->type) { case TSDB_DATA_TYPE_BOOL: { // bool if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_BOOL), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset); } else { if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { if (strncmp(pToken->z, "true", pToken->n) == 0) { - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &TRUE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &TRUE_VALUE, + TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); } else if (strncmp(pToken->z, "false", pToken->n) == 0) { - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &FALSE_VALUE, TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &FALSE_VALUE, + TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); } else { return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z); } } else if (pToken->type == TK_INTEGER) { iv = strtoll(pToken->z, NULL, 10); - *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, - ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); } else if (pToken->type == TK_FLOAT) { double dv = strtod(pToken->z, NULL); - *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, - ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), TYPE_BYTES[TSDB_DATA_TYPE_BOOL], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BOOL]); } else { return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z); @@ -460,7 +461,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_TINYINT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_TINYINT), TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -470,8 +472,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } uint8_t tmpVal = (uint8_t)iv; - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_TINYINT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TINYINT]); } @@ -479,7 +481,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_UTINYINT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_UTINYINT), TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -489,8 +492,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } uint8_t tmpVal = (uint8_t)iv; - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UTINYINT]); } @@ -498,8 +501,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_SMALLINT: if (isNullStr(pToken)) { - // *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL; - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_SMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -509,8 +512,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } int16_t tmpVal = (int16_t)iv; - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_SMALLINT]); } @@ -518,7 +521,9 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_USMALLINT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = + tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_USMALLINT), TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -528,8 +533,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } uint16_t tmpVal = (uint16_t)iv; - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_USMALLINT]); } @@ -537,7 +542,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_INT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_INT), TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -547,7 +553,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } int32_t tmpVal = (int32_t)iv; - *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_INT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_INT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_INT]); } @@ -555,7 +562,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_UINT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_UINT), TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -565,8 +573,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } uint32_t tmpVal = (uint32_t)iv; - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UINT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_UINT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UINT]); } @@ -574,7 +582,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_BIGINT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_BIGINT), TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); if (ret != TSDB_CODE_SUCCESS) { @@ -583,14 +592,16 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); } - *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &iv, TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &iv, + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_BIGINT]); } break; case TSDB_DATA_TYPE_UBIGINT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_UBIGINT), TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset); } else { ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); if (ret != TSDB_CODE_SUCCESS) { @@ -600,15 +611,16 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } uint64_t tmpVal = (uint64_t)iv; - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_UBIGINT]); } break; case TSDB_DATA_TYPE_FLOAT: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_FLOAT), TYPE_BYTES[TSDB_DATA_TYPE_FLOAT], tOffset); } else { double dv; if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { @@ -621,15 +633,16 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri } float tmpVal = (float)dv; - *sizeAppend = - tsSetColumnValue(payload, pSchema->colId, pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_FLOAT], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_FLOAT]); } break; case TSDB_DATA_TYPE_DOUBLE: if (isNullStr(pToken)) { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_DOUBLE), TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset); } else { double dv; if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { @@ -640,7 +653,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); } - *sizeAppend = tsSetColumnValue(payload, pSchema->colId, pSchema->type, &dv, TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]); + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, &dv, + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_DOUBLE]); } break; @@ -648,21 +662,22 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_BINARY: // binary data cannot be null-terminated char string, otherwise the last char of the string is lost if (pToken->type == TK_NULL) { - // setVardataNull(payload, TSDB_DATA_TYPE_BINARY); - *isColNull = true; + payloadColSetId(payload, pSchema->colId); + payloadColSetType(payload, pSchema->type); + memcpy(POINTER_SHIFT(payloadStart, tOffset), tdGetNullVal(TSDB_DATA_TYPE_BINARY), VARSTR_HEADER_SIZE + CHAR_BYTES); + *sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + CHAR_BYTES); } else { // too long values will return invalid sql, not be truncated automatically if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor return tscInvalidOperationMsg(msg, "string data overflow", pToken->z); } - // STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); payloadColSetId(payload, pSchema->colId); payloadColSetType(payload, pSchema->type); - varDataSetLen(payloadColValue(payload), pToken->n); - memcpy(varDataVal(payloadColValue(payload)), pToken->z, pToken->n); - *sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + pToken->n); - *dataRowColDeltaLen += (TDRowLenT)(pToken->n - sizeof(uint8_t)); + varDataSetLen(POINTER_SHIFT(payloadStart,tOffset), pToken->n); + memcpy(varDataVal(POINTER_SHIFT(payloadStart,tOffset)), pToken->z, pToken->n); + *sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + pToken->n); + *dataRowColDeltaLen += (TDRowLenT)(pToken->n - CHAR_BYTES); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + pToken->n); } @@ -670,22 +685,25 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri case TSDB_DATA_TYPE_NCHAR: if (pToken->type == TK_NULL) { - *isColNull = true; + payloadColSetId(payload, pSchema->colId); + payloadColSetType(payload, pSchema->type); + memcpy(POINTER_SHIFT(payloadStart,tOffset), tdGetNullVal(TSDB_DATA_TYPE_NCHAR), VARSTR_HEADER_SIZE + INT_BYTES); + *sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + INT_BYTES); } else { // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' int32_t output = 0; payloadColSetId(payload, pSchema->colId); payloadColSetType(payload, pSchema->type); - if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payloadColValue(payload)), + if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(POINTER_SHIFT(payloadStart,tOffset)), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { char buf[512] = {0}; snprintf(buf, tListLen(buf), "%s", strerror(errno)); return tscInvalidOperationMsg(msg, buf, pToken->z); } - varDataSetLen(payloadColValue(payload), output); + varDataSetLen(POINTER_SHIFT(payloadStart,tOffset), output); - *sizeAppend = (TDRowLenT)(PAYLOAD_ID_TYPE_LEN + VARSTR_HEADER_SIZE + output); + *sizeAppend = (TDRowLenT)(VARSTR_HEADER_SIZE + output); *dataRowColDeltaLen += (TDRowLenT)(output - sizeof(uint32_t)); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + VARSTR_HEADER_SIZE + output); } @@ -696,12 +714,13 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri if (primaryKey) { // When building SKVRow primaryKey, we should not skip even with NULL value. int64_t tmpVal = 0; - - *sizeAppend = tsSetColumnValue(primaryKeyStart, pSchema->colId, pSchema->type, &tmpVal, - TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); + *sizeAppend = tsSetPayloadColValue(payloadStart, primaryKeyStart, pSchema->colId, pSchema->type, &tmpVal, + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); } else { - *isColNull = true; + *sizeAppend = tsSetPayloadColValue(payloadStart, payload, pSchema->colId, pSchema->type, + tdGetNullVal(TSDB_DATA_TYPE_TIMESTAMP), + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset); } } else { int64_t tmpVal; @@ -709,8 +728,8 @@ static int32_t tsParseOneColumnKV(SSchema *pSchema, SStrToken *pToken, char *pri return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); } - *sizeAppend = tsSetColumnValue(primaryKey ? primaryKeyStart : payload, pSchema->colId, pSchema->type, &tmpVal, - TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); + *sizeAppend = tsSetPayloadColValue(payloadStart, primaryKey ? primaryKeyStart : payload, pSchema->colId, + pSchema->type, &tmpVal, TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP], tOffset); *kvRowColLen += (TDRowLenT)(sizeof(SColIdx) + TYPE_BYTES[TSDB_DATA_TYPE_TIMESTAMP]); } @@ -762,27 +781,31 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i int32_t index = 0; SStrToken sToken = {0}; - SMemRowBuilder *pBuilder = &pDataBlocks->rowBuilder; - char * payload = pDataBlocks->pData + pDataBlocks->size; + SMemRowHelper *pHelper = &pDataBlocks->rowHelper; + char * payload = pDataBlocks->pData + pDataBlocks->size; SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta); - // 1. set the parsed value from sql string + int32_t rowSize = 0; - uint16_t rowSizeAppended = 0; - uint16_t nColsNotNull = 0; - TDRowLenT dataRowLen = pBuilder->allNullLen; - TDRowLenT kvRowLen = TD_MEM_ROW_KV_VER_SIZE; - + int32_t dataRowLen = pHelper->allNullLen; + int32_t kvRowLen = TD_MEM_ROW_KV_VER_SIZE; + TDRowTLenT payloadValOffset = 0; + TDRowLenT colValOffset = 0; ASSERT(dataRowLen > 0); + + payloadSetNCols(payload, spd->numOfBound); + payloadValOffset = payloadValuesOffset(payload); // rely on payloadNCols + payloadSetTLen(payload, payloadValOffset); - char *kvPrimayKeyStart = payload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple - char *kvStart = kvPrimayKeyStart + PAYLOAD_PRIMARY_COL_LEN; // the column tuple behind the primaryKey + char *kvPrimaryKeyStart = payload + PAYLOAD_HEADER_LEN; // primaryKey in 1st column tuple + char *kvStart = kvPrimaryKeyStart + PAYLOAD_COL_HEAD_LEN; // the column tuple behind the primaryKey + // 1. set the parsed value from sql string for (int i = 0; i < spd->numOfBound; ++i) { // the start position in data block buffer of current value in sql - int32_t colIndex = spd->boundedColumns[i]; // ordered + int32_t colIndex = spd->boundedColumns[i]; char *start = payload + spd->cols[colIndex].offset; @@ -845,43 +868,61 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i } bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); - bool isColNull = false; TDRowLenT dataRowDeltaColLen = 0; // When combine the data as SDataRow, the delta len between all NULL columns. TDRowLenT kvRowColLen = 0; - TDRowLenT colSizeAppended = 0; - // make sure the Primarykey locates in the 1st column - int32_t ret = tsParseOneColumnKV(pSchema, &sToken, kvPrimayKeyStart, kvStart, pInsertParam->msg, str, isPrimaryKey, - timePrec, &colSizeAppended, &isColNull, &dataRowDeltaColLen, &kvRowColLen); + TDRowLenT colValAppended = 0; + + if(!spd->isOrdered) { + ASSERT(spd->colIdxInfo != NULL); + if(!isPrimaryKey) { + kvStart = POINTER_SHIFT(kvPrimaryKeyStart, spd->colIdxInfo[i].finalIdx * PAYLOAD_COL_HEAD_LEN); + } else { + ASSERT(spd->colIdxInfo[i].finalIdx == 0); + } + } else { + ASSERT(spd->colIdxInfo == NULL); + } + // the primary key locates in 1st column + int32_t ret = tsParseOneColumnKV(pSchema, &sToken, payload, kvPrimaryKeyStart, kvStart, pInsertParam->msg, str, + isPrimaryKey, timePrec, payloadValOffset + colValOffset, &colValAppended, + &dataRowDeltaColLen, &kvRowColLen); if (ret != TSDB_CODE_SUCCESS) { return ret; } - if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, payloadColValue(kvPrimayKeyStart)) != TSDB_CODE_SUCCESS) { - tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } - if (isColNull == false) { - ++nColsNotNull; + if (isPrimaryKey) { + if (tsCheckTimestamp(pDataBlocks, payloadValues(payload)) != TSDB_CODE_SUCCESS) { + tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; + } + payloadColSetOffset(kvPrimaryKeyStart, colValOffset); + } else { + payloadColSetOffset(kvStart, colValOffset); + if(spd->isOrdered) { + kvStart += PAYLOAD_COL_HEAD_LEN; // move to next column + } } + + colValOffset += colValAppended; kvRowLen += kvRowColLen; dataRowLen += dataRowDeltaColLen; - if (!isPrimaryKey) { - kvStart += colSizeAppended; // move to next column - } - rowSizeAppended += colSizeAppended; // calculate rowLen } - if (kvRowLen < dataRowLen) { + if (kvRowLen < dataRowLen * KVRowRatio) { payloadSetType(payload, SMEM_ROW_KV); } else { payloadSetType(payload, SMEM_ROW_DATA); } - *len = PAYLOAD_HEADER_LEN + rowSizeAppended; - - payloadSetNCols(payload, nColsNotNull); + ASSERT(colValOffset <= TSDB_MAX_BYTES_PER_ROW); + + *len = (int32_t)(payloadValOffset + colValOffset); payloadSetTLen(payload, *len); + // TSKEY tsKey = payloadKey(payload); + // ASSERT((tsKey < 1627747200000000 && tsKey > 1498838400000000) || (tsKey < 1627747200000 && tsKey > 1498838400000) || + // (tsKey < 1627747200 && tsKey > 1498838400)); + return TSDB_CODE_SUCCESS; } @@ -895,6 +936,27 @@ static int32_t rowDataCompar(const void *lhs, const void *rhs) { return left > right ? 1 : -1; } } +static int32_t schemaIdxCompar(const void *lhs, const void *rhs) { + uint16_t left = *(uint16_t *)lhs; + uint16_t right = *(uint16_t *)rhs; + + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} + +static int32_t boundIdxCompar(const void *lhs, const void *rhs) { + uint16_t left = *(uint16_t *)POINTER_SHIFT(lhs, sizeof(uint16_t)); + uint16_t right = *(uint16_t *)POINTER_SHIFT(rhs, sizeof(uint16_t)); + + if (left == right) { + return 0; + } else { + return left > right ? 1 : -1; + } +} int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SInsertStatementParam *pInsertParam, int32_t* numOfRows, char *tmpTokenBuf) { @@ -912,8 +974,8 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn int32_t extendedRowSize = getExtendedRowSize(&tinfo); - initSMemRowBuilder(&pDataBlock->rowBuilder, tscGetTableSchema(pDataBlock->pTableMeta), - tscGetNumOfColumns(pDataBlock->pTableMeta), 0); + initSMemRowHelper(&pDataBlock->rowHelper, tscGetTableSchema(pDataBlock->pTableMeta), + tscGetNumOfColumns(pDataBlock->pTableMeta), 0); while (1) { index = 0; @@ -965,9 +1027,10 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) { pColInfo->numOfCols = numOfCols; pColInfo->numOfBound = numOfCols; - + pColInfo->isOrdered = true; pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t)); pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn)); + pColInfo->colIdxInfo = NULL; for (int32_t i = 0; i < pColInfo->numOfCols; ++i) { if (i > 0) { @@ -1093,14 +1156,23 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk SBlockKeyTuple *pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; char * pBlockData = pBlocks->data; - uint32_t totolPayloadLen = 0; - TDRowLenT payloadTLen = 0; + TDRowTLenT totolPayloadTLen = 0; + TDRowTLenT payloadTLen = 0; int n = 0; while (n < nRows) { pBlkKeyTuple->skey = payloadKey(pBlockData); pBlkKeyTuple->payloadAddr = pBlockData; payloadTLen = payloadTLen(pBlockData); - totolPayloadLen += payloadTLen; + + ASSERT(payloadNCols(pBlockData) <= 4096); + ASSERT(payloadTLen(pBlockData) < 65536); + ASSERT(pBlkKeyTuple->payloadAddr != NULL); + + ASSERT((pBlkKeyTuple->skey < 1627747200000000 && pBlkKeyTuple->skey > 1498838400000000) || + (pBlkKeyTuple->skey < 1627747200000 && pBlkKeyTuple->skey > 1498838400000) || + (pBlkKeyTuple->skey < 1627747200 && pBlkKeyTuple->skey > 1498838400)); + + totolPayloadTLen += payloadTLen; // next loop pBlockData += payloadTLen; ++pBlkKeyTuple; @@ -1119,23 +1191,36 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk TSKEY tj = (pBlkKeyTuple + j)->skey; if (ti == tj) { - totolPayloadLen -= payloadTLen(pBlkKeyTuple + j); + totolPayloadTLen -= payloadTLen(pBlkKeyTuple + j); ++j; continue; } int32_t nextPos = (++i); if (nextPos != j) { - memmove(pBlkKeyTuple + sizeof(SBlockKeyTuple) * nextPos, pBlkKeyTuple + sizeof(SBlockKeyTuple) * j, sizeof(SBlockKeyTuple)); + memmove(pBlkKeyTuple + nextPos, pBlkKeyTuple + j, sizeof(SBlockKeyTuple)); } ++j; } dataBuf->ordered = true; pBlocks->numOfRows = i + 1; + + ASSERT(pBlocks->numOfRows <= nRows); + + int tt = 0; + pBlkKeyTuple = pBlkKeyInfo->pKeyTuple; + while (tt < pBlocks->numOfRows) { + ASSERT(pBlkKeyTuple->payloadAddr != NULL); + ASSERT((pBlkKeyTuple->skey < 1627747200000000 && pBlkKeyTuple->skey > 1498838400000000) || + (pBlkKeyTuple->skey < 1627747200000 && pBlkKeyTuple->skey > 1498838400000) || + (pBlkKeyTuple->skey < 1627747200 && pBlkKeyTuple->skey > 1498838400)); + ++pBlkKeyTuple; + ++tt; + } } - - dataBuf->size = sizeof(SSubmitBlk) + totolPayloadLen; + + dataBuf->size = sizeof(SSubmitBlk) + totolPayloadTLen; dataBuf->prevTS = INT64_MIN; return 0; @@ -1475,7 +1560,6 @@ static int32_t validateDataSource(SInsertStatementParam *pInsertParam, int32_t t static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDataColInfo* pColInfo, SSchema* pSchema, char* str, char **end) { pColInfo->numOfBound = 0; - memset(pColInfo->boundedColumns, 0, sizeof(int32_t) * pColInfo->numOfCols); for(int32_t i = 0; i < pColInfo->numOfCols; ++i) { pColInfo->cols[i].hasVal = false; @@ -1483,7 +1567,7 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat int32_t code = TSDB_CODE_SUCCESS; - int32_t index = 0; + int32_t index = 0; SStrToken sToken = tStrGetToken(str, &index, false); str += index; @@ -1491,7 +1575,8 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat code = tscSQLSyntaxErrMsg(pInsertParam->msg, "( is expected", sToken.z); goto _clean; } - + bool isOrdered = true; + int32_t lastColIdx = -1; while (1) { index = 0; sToken = tStrGetToken(str, &index, false); @@ -1523,6 +1608,14 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat pColInfo->boundedColumns[pColInfo->numOfBound] = t; pColInfo->numOfBound += 1; findColumnIndex = true; + + if (isOrdered) { + if (lastColIdx > t) { + isOrdered = false; + } else { + lastColIdx = t; + } + } break; } } @@ -1533,10 +1626,32 @@ static int32_t parseBoundColumns(SInsertStatementParam *pInsertParam, SParsedDat } } - memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0 , sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound)); + pColInfo->isOrdered = isOrdered; + + if (!isOrdered) { + pColInfo->colIdxInfo = calloc(pColInfo->numOfBound, sizeof(SBoundIdxInfo)); + if (pColInfo->colIdxInfo == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto _clean; + } + SBoundIdxInfo *pColIdx = pColInfo->colIdxInfo; + for (int i = 0; i < pColInfo->numOfBound; ++i) { + pColIdx[i].schemaColIdx = (uint16_t)pColInfo->boundedColumns[i]; + pColIdx[i].boundIdx = (uint16_t)i; + } + qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), schemaIdxCompar); + for (int i = 0; i < pColInfo->numOfBound; ++i) { + pColIdx[i].finalIdx = (uint16_t)i; + } + qsort(pColIdx, pColInfo->numOfBound, sizeof(SBoundIdxInfo), boundIdxCompar); + } + + memset(&pColInfo->boundedColumns[pColInfo->numOfBound], 0, + sizeof(int32_t) * (pColInfo->numOfCols - pColInfo->numOfBound)); + return TSDB_CODE_SUCCESS; - _clean: +_clean: pInsertParam->sql = NULL; return code; } @@ -1944,8 +2059,8 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow goto _error; } - initSMemRowBuilder(&pTableDataBlock->rowBuilder, tscGetTableSchema(pTableDataBlock->pTableMeta), - tscGetNumOfColumns(pTableDataBlock->pTableMeta), 0); + initSMemRowHelper(&pTableDataBlock->rowHelper, tscGetTableSchema(pTableDataBlock->pTableMeta), + tscGetNumOfColumns(pTableDataBlock->pTableMeta), 0); while ((readLen = tgetline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 729fae3bfa..9246b86ea6 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1433,6 +1433,7 @@ void tscFreeSqlObj(SSqlObj* pSql) { void tscDestroyBoundColumnInfo(SParsedDataColInfo* pColInfo) { tfree(pColInfo->boundedColumns); tfree(pColInfo->cols); + tfree(pColInfo->colIdxInfo); } void tscDestroyDataBlock(STableDataBlocks* pDataBlock, bool removeMeta) { @@ -1646,51 +1647,54 @@ int32_t tscGetDataBlockFromList(SHashObj* pHashList, int64_t id, int32_t size, i return TSDB_CODE_SUCCESS; } - - - static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { SSchema* pSchema = pBuilder->pSchema; char* p = (char*)pBuilder->buf; int toffset = 0; uint16_t nCols = pBuilder->nCols; -// RawRow payload structure: -// |<---------- header ------------->|<------- column data array ------->| -// |SMemRowType| dataLen | nCols | colId | colType | value |...|...| -// +-----------+----------+----------+---------------------------------->| -// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...| -// +-----------+----------+----------+---------------------------------->| uint8_t memRowType = payloadType(p); - uint16_t nColsNotNull = payloadNCols(p); - if (pBuilder->nCols <= 0 || nColsNotNull <= 0) { + uint16_t nColsBound = payloadNCols(p); + if (pBuilder->nCols <= 0 || nColsBound <= 0) { return NULL; } - ASSERT(nColsNotNull <= nCols); - + char* pVals = POINTER_SHIFT(p, payloadValuesOffset(p)); SMemRow* memRow = (SMemRow)pBuilder->pDataBlock; memRowSetType(memRow, memRowType); + // ----------------- Raw payload structure for row: + /* |<------------ Head ------------->|<----------- body of column data tuple ------------------->| + * | |<----------------- flen ------------->|<--- value part --->| + * |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... | + * +-----------+----------+----------+--------------------------------------|--------------------| + * | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... | + * +-----------+----------+----------+--------------------------------------+--------------------| + * 1. offset in column data tuple starts from the value part in case of uint16_t overflow. + * 2. dataTLen: total length including the header and body. + */ + if (memRowType == SMEM_ROW_DATA) { + ASSERT(nColsBound <= nCols); SDataRow trow = (SDataRow)memRowDataBody(memRow); - dataRowSetLen(trow, (uint16_t)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); + dataRowSetLen(trow, (TDRowLenT)(TD_DATA_ROW_HEAD_SIZE + pBuilder->flen)); dataRowSetVersion(trow, pBuilder->sversion); p = (char*)payloadBody(pBuilder->buf); uint16_t i = 0, j = 0; - while (j < pBuilder->nCols) { - if (i >= nColsNotNull) { + while (j < nCols) { + if (i >= nColsBound) { break; } - int16_t colId = *(int16_t*)p; + int16_t colId = payloadColId(p); if (colId == pSchema[j].colId) { - tdAppendColVal(trow, payloadColValue(p), pSchema[j].type, toffset); + // ASSERT(payloadColType(p) == pSchema[j].type); + tdAppendColVal(trow, POINTER_SHIFT(pVals, payloadColOffset(p)), pSchema[j].type, toffset); toffset += TYPE_BYTES[pSchema[j].type]; - p = skipToNextEles(p); + p = payloadNextCol(p); ++i; ++j; } else if (colId < pSchema[j].colId) { - p = skipToNextEles(p); + p = payloadNextCol(p); ++i; } else { tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset); @@ -1699,41 +1703,43 @@ static SMemRow tdGenMemRowFromBuilder(SMemRowBuilder* pBuilder) { } } - while (j < pBuilder->nCols) { + while (j < nCols) { tdAppendColVal(trow, tdGetNullVal(pSchema[j].type), pSchema[j].type, toffset); toffset += TYPE_BYTES[pSchema[j].type]; ++j; } - while (i < nColsNotNull) { - p = skipToNextEles(p); + + #if 0 // no need anymore + while (i < nColsBound) { + p = payloadNextCol(p); ++i; } + #endif } else if (memRowType == SMEM_ROW_KV) { - ASSERT(nColsNotNull <= pBuilder->nCols); + ASSERT(nColsBound <= nCols); SKVRow kvRow = (SKVRow)memRowKvBody(memRow); - uint16_t tlen = TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsNotNull; - kvRowSetLen(kvRow, tlen); - kvRowSetNCols(kvRow, nColsNotNull); - memRowKvSetVersion(memRow, pBuilder->sversion); + kvRowSetLen(kvRow, (TDRowLenT)(TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * nColsBound)); + kvRowSetNCols(kvRow, nColsBound); + memRowSetKvVersion(memRow, pBuilder->sversion); p = (char*)payloadBody(pBuilder->buf); int i = 0; - while (i < nColsNotNull) { + while (i < nColsBound) { int16_t colId = payloadColId(p); uint8_t colType = payloadColType(p); - tdAppendKvColVal(kvRow, payloadColValue(p), colId, colType, toffset); + tdAppendKvColVal(kvRow, POINTER_SHIFT(pVals,payloadColOffset(p)), colId, colType, toffset); toffset += sizeof(SColIdx); - p = skipToNextEles(p); + p = payloadNextCol(p); ++i; } } else { ASSERT(0); } - - pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + memRowTLen(memRow); // next row - pBuilder->pSubmitBlk->dataLen += memRowTLen(memRow); + int32_t rowTLen = memRowTLen(memRow); + pBuilder->pDataBlock = (char*)pBuilder->pDataBlock + rowTLen; // next row + pBuilder->pSubmitBlk->dataLen += rowTLen; return memRow; } @@ -1744,7 +1750,6 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo STableMeta* pTableMeta = pTableDataBlock->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta); - SMemRowBuilder* pBuilder = &pTableDataBlock->rowBuilder; SSubmitBlk* pBlock = pDataBlock; memcpy(pDataBlock, pTableDataBlock->pData, sizeof(SSubmitBlk)); @@ -1780,18 +1785,19 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo pBlock->dataLen = 0; int32_t numOfRows = htons(pBlock->numOfRows); - pBuilder->pSchema = pSchema; - pBuilder->sversion = pTableMeta->sversion; - pBuilder->flen = flen; - pBuilder->nCols = tinfo.numOfColumns; - pBuilder->pDataBlock = pDataBlock; - pBuilder->pSubmitBlk = pBlock; - pBuilder->buf = p; - pBuilder->size = 0; + SMemRowBuilder rowBuilder; + rowBuilder.pSchema = pSchema; + rowBuilder.sversion = pTableMeta->sversion; + rowBuilder.flen = flen; + rowBuilder.nCols = tinfo.numOfColumns; + rowBuilder.pDataBlock = pDataBlock; + rowBuilder.pSubmitBlk = pBlock; + rowBuilder.buf = p; + rowBuilder.size = 0; for (int32_t i = 0; i < numOfRows; ++i) { - pBuilder->buf = (blkKeyTuple+i)->payloadAddr; - tdGenMemRowFromBuilder(pBuilder); + rowBuilder.buf = (blkKeyTuple + i)->payloadAddr; + tdGenMemRowFromBuilder(&rowBuilder); } int32_t len = pBlock->dataLen + pBlock->schemaLen; @@ -1803,7 +1809,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, bo static int32_t getRowExpandSize(STableMeta* pTableMeta) { // add prefix len of KV type SMemRow(we may use SDataRow or SKVRow) - int32_t result = TD_DATA_ROW_HEAD_SIZE + TD_MEM_ROW_KV_TYPE_VER_SIZE; + int32_t result = TD_MEM_ROW_DATA_HEAD_SIZE; int32_t columns = tscGetNumOfColumns(pTableMeta); SSchema* pSchema = tscGetTableSchema(pTableMeta); for(int32_t i = 0; i < columns; i++) { diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index 093b7c5188..49353115ae 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -186,18 +186,6 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { return 0; } } -/* A memory data row, the format is like below: - *|---------+---------------------+--------------------------- len ---------------------------------->| - *|<- type->|<-- Head -->|<--------- flen -------------->| | - *|---------+---------------------+---------------------------------+---------------------------------+ - *| uint8_t | uint16_t | int16_t | | | - *|---------+----------+----------+---------------------------------+---------------------------------+ - *| flag | len | sversion | First part | Second part | - *|---------+----------+----------+---------------------------------+---------------------------------+ - * - * NOTE: timestamp in this row structure is TKEY instead of TSKEY - */ -typedef void *SMemRow; // ----------------- Data row structure @@ -216,7 +204,7 @@ typedef void *SDataRow; #define TD_DATA_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) -#define dataRowLen(r) (*(uint16_t *)(r)) +#define dataRowLen(r) (*(TDRowLenT *)(r)) // 0~65535 #define dataRowVersion(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(int16_t))) #define dataRowTuple(r) POINTER_SHIFT(r, TD_DATA_ROW_HEAD_SIZE) #define dataRowTKey(r) (*(TKEY *)(dataRowTuple(r))) @@ -231,7 +219,6 @@ SDataRow tdNewDataRowFromSchema(STSchema *pSchema); void tdFreeDataRow(SDataRow row); void tdInitDataRow(SDataRow row, STSchema *pSchema); SDataRow tdDataRowDup(SDataRow row); -SMemRow tdMemRowDup(SMemRow row); // offset here not include dataRow header length static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t type, int32_t offset) { @@ -247,7 +234,7 @@ static FORCE_INLINE int tdAppendColVal(SDataRow row, const void *value, int8_t t if (offset == 0) { ASSERT(type == TSDB_DATA_TYPE_TIMESTAMP); TKEY tvalue = tdGetTKEY(*(TSKEY *)value); - memcpy(POINTER_SHIFT(row, toffset), (void *)(&tvalue), TYPE_BYTES[type]); + memcpy(POINTER_SHIFT(row, toffset), (const void *)(&tvalue), TYPE_BYTES[type]); } else { memcpy(POINTER_SHIFT(row, toffset), value, TYPE_BYTES[type]); } @@ -287,7 +274,7 @@ void dataColSetOffset(SDataCol *pCol, int nEle); bool isNEleNull(SDataCol *pCol, int nEle); void dataColSetNEleNull(SDataCol *pCol, int nEle, int maxPoints); -static FORCE_INLINE const void *tdGetNullVal(int8_t type) { +FORCE_INLINE const void *tdGetNullVal(int8_t type) { switch (type) { case TSDB_DATA_TYPE_BOOL: return &BoolNull; @@ -400,11 +387,11 @@ void tdResetDataCols(SDataCols *pCols); int tdInitDataCols(SDataCols *pCols, STSchema *pSchema); SDataCols *tdDupDataCols(SDataCols *pCols, bool keepData); SDataCols *tdFreeDataCols(SDataCols *pCols); -void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols); int tdMergeDataCols(SDataCols *target, SDataCols *source, int rowsToMerge, int *pOffset); // ----------------- K-V data row structure -/* +/* |<-------------------------------------- len -------------------------------------------->| + * |<----- header ----->|<--------------------------- body -------------------------------->| * +----------+----------+---------------------------------+---------------------------------+ * | uint16_t | int16_t | | | * +----------+----------+---------------------------------+---------------------------------+ @@ -420,7 +407,7 @@ typedef struct { #define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t)) -#define kvRowLen(r) (*(uint16_t *)(r)) +#define kvRowLen(r) (*(TDRowLenT *)(r)) #define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t))) #define kvRowSetLen(r, len) kvRowLen(r) = (len) #define kvRowSetNCols(r, n) kvRowNCols(r) = (n) @@ -532,7 +519,7 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, // ----------------- SMemRow appended with sequential data row structure /* - * |-------------------------------+--------------------------- len ---------------------------------->| + * |---------|------------------------------------------------- len ---------------------------------->| * |<-------- Head ------>|<--------- flen -------------->| | * |---------+---------------------+---------------------------------+---------------------------------+ * | uint8_t | uint16_t | int16_t | | | @@ -544,7 +531,8 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, */ // ----------------- SMemRow appended with extended K-V data row structure -/* | +/* |--------------------|------------------------------------------------ len ---------------------------------->| + * |<------------- Head ------------>|<--------- flen -------------->| | * |--------------------+----------+--------------------------------------------+---------------------------------+ * | uint8_t | int16_t | uint16_t | int16_t | | | * |---------+----------+----------+----------+---------------------------------+---------------------------------+ @@ -552,11 +540,13 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, * |---------+----------+----------+----------+---------------------------------+---------------------------------+ */ +typedef void *SMemRow; + #define TD_MEM_ROW_TYPE_SIZE sizeof(uint8_t) #define TD_MEM_ROW_KV_VER_SIZE sizeof(int16_t) #define TD_MEM_ROW_KV_TYPE_VER_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE) #define TD_MEM_ROW_DATA_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_DATA_ROW_HEAD_SIZE) -#define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE) +// #define TD_MEM_ROW_KV_HEAD_SIZE (TD_MEM_ROW_TYPE_SIZE + TD_MEM_ROW_KV_VER_SIZE + TD_KV_ROW_HEAD_SIZE) #define SMEM_ROW_DATA 0U // SDataRow #define SMEM_ROW_KV 1U // SKVRow @@ -567,21 +557,22 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, #define memRowDataBody(r) POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE) // section after flag #define memRowKvBody(r) \ - POINTER_SHIFT(r, TD_MEM_ROW_KV_TYPE_VER_SIZE) // section after flag + sversion as to reuse of SKVRow -// #define memRowBody(r) (isDataRow(r) ? memRowDataBody(r) : memRowKvBody(r)) + POINTER_SHIFT(r, TD_MEM_ROW_KV_TYPE_VER_SIZE) // section after flag + sversion as to reuse SKVRow -#define memRowDataLen(r) (*(TDRowLenT *)memRowDataBody(r)) -#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r)) -#define memRowDataTLen(r) (memRowDataLen(r) + (TDRowLenT)TD_MEM_ROW_TYPE_SIZE) -#define memRowKvTLen(r) (memRowKvLen(r) + (TDRowLenT)TD_MEM_ROW_KV_TYPE_VER_SIZE) +#define memRowDataLen(r) (*(TDRowLenT *)memRowDataBody(r)) // 0~65535 +#define memRowKvLen(r) (*(TDRowLenT *)memRowKvBody(r)) // 0~65535 + +#define memRowDataTLen(r) (memRowDataLen(r) + TD_MEM_ROW_TYPE_SIZE) // using uint32_t/int32_t to store the TLen + +#define memRowKvTLen(r) (memRowKvLen(r) + TD_MEM_ROW_KV_TYPE_VER_SIZE) #define memRowLen(r) (isDataRow(r) ? memRowDataLen(r) : memRowKvLen(r)) -#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) +#define memRowTLen(r) (isDataRow(r) ? memRowDataTLen(r) : memRowKvTLen(r)) // using uint32_t/int32_t to store the TLen #define memRowDataVersion(r) dataRowVersion(memRowDataBody(r)) #define memRowKvVersion(r) (*(int16_t *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) #define memRowVersion(r) (isDataRow(r) ? memRowDataVersion(r) : memRowKvVersion(r)) // schema version -#define memRowKvSetVersion(r, v) (memRowKvVersion(r) = (v)) +#define memRowSetKvVersion(r, v) (memRowKvVersion(r) = (v)) #define memRowTuple(r) (isDataRow(r) ? dataRowTuple(memRowDataBody(r)) : kvRowValues(memRowKvBody(r))) #define memRowTKey(r) (isDataRow(r) ? dataRowTKey(memRowDataBody(r)) : kvRowTKey(memRowKvBody(r))) @@ -594,6 +585,8 @@ static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, #define memRowMaxBytesFromSchema(s) (schemaTLen(s) + TD_MEM_ROW_DATA_HEAD_SIZE) #define memRowDeleted(r) TKEY_IS_DELETED(memRowTKey(r)) +SMemRow tdMemRowDup(SMemRow row); +void tdAppendMemRowToDataCol(SMemRow row, STSchema *pSchema, SDataCols *pCols); // NOTE: offset here including the header size static FORCE_INLINE void *tdGetKvRowDataOfCol(void *row, int32_t offset) { return POINTER_SHIFT(row, offset); } // NOTE: offset here including the header size @@ -608,50 +601,52 @@ static FORCE_INLINE void *tdGetMemRowDataOfCol(void *row, int8_t type, int32_t o return NULL; } -// RawRow payload structure: -// |<---------- header ------------->|<---- body: column data tuple ---->| -// |SMemRowType| dataLen | nCols | colId | colType | value |...|...| -// +-----------+----------+----------+---------------------------------->| -// | uint8_t | uint16_t | uint16_t | int16_t | uint8_t | ??? |...|...| -// +-----------+----------+----------+---------------------------------->| +// ----------------- Raw payload structure for row: +/* |<------------ Head ------------->|<----------- body of column data tuple ------------------->| + * | |<----------------- flen ------------->|<--- value part --->| + * |SMemRowType| dataTLen | nCols | colId | colType | offset | ... | value |...|...|... | + * +-----------+----------+----------+--------------------------------------|--------------------| + * | uint8_t | uint32_t | uint16_t | int16_t | uint8_t | uint16_t | ... |.......|...|...|... | + * +-----------+----------+----------+--------------------------------------+--------------------| + * 1. offset in column data tuple starts from the value part in case of uint16_t overflow. + * 2. dataTLen: total length including the header and body. + */ #define PAYLOAD_NCOLS_LEN sizeof(uint16_t) -#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowLenT)) +#define PAYLOAD_NCOLS_OFFSET (sizeof(uint8_t) + sizeof(TDRowTLenT)) #define PAYLOAD_HEADER_LEN (PAYLOAD_NCOLS_OFFSET + PAYLOAD_NCOLS_LEN) #define PAYLOAD_ID_LEN sizeof(int16_t) #define PAYLOAD_ID_TYPE_LEN (sizeof(int16_t) + sizeof(uint8_t)) +#define PAYLOAD_COL_HEAD_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(uint16_t)) #define PAYLOAD_PRIMARY_COL_LEN (PAYLOAD_ID_TYPE_LEN + sizeof(TSKEY)) #define payloadBody(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN) #define payloadType(r) (*(uint8_t *)(r)) #define payloadSetType(r, t) (payloadType(r) = (t)) -#define payloadTLen(r) (*(TDRowLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header +#define payloadTLen(r) (*(TDRowTLenT *)POINTER_SHIFT(r, TD_MEM_ROW_TYPE_SIZE)) // including total header #define payloadSetTLen(r, l) (payloadTLen(r) = (l)) #define payloadNCols(r) (*(TDRowLenT *)POINTER_SHIFT(r, PAYLOAD_NCOLS_OFFSET)) #define payloadSetNCols(r, n) (payloadNCols(r) = (n)) +#define payloadValuesOffset(r) \ + (PAYLOAD_HEADER_LEN + payloadNCols(r) * PAYLOAD_COL_HEAD_LEN) // avoid using the macro in loop +#define payloadValues(r) POINTER_SHIFT(r, payloadValuesOffset(r)) // avoid using the macro in loop +#define payloadColId(c) (*(int16_t *)(c)) +#define payloadColType(c) (*(uint8_t *)POINTER_SHIFT(c, PAYLOAD_ID_LEN)) +#define payloadColOffset(c) (*(uint16_t *)POINTER_SHIFT(c, PAYLOAD_ID_TYPE_LEN)) +#define payloadColValue(c) POINTER_SHIFT(c, payloadColOffset(c)) -#define payloadColId(r) (*(int16_t *)(r)) -#define payloadColType(r) (*(uint8_t *)POINTER_SHIFT(r, PAYLOAD_ID_LEN)) -#define payloadColValue(r) POINTER_SHIFT(r, PAYLOAD_ID_TYPE_LEN) +#define payloadColSetId(c, i) (payloadColId(c) = (i)) +#define payloadColSetType(c, t) (payloadColType(c) = (t)) +#define payloadColSetOffset(c, o) (payloadColOffset(c) = (o)) -#define payloadColSetId(r, i) (payloadColId(r) = (i)) -#define payloadColSetType(r, t) (payloadColType(r) = (t)) - -#define payloadKeyAddr(r) POINTER_SHIFT(r, PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN) -#define payloadTKey(r) (*(TKEY *)(payloadKeyAddr(r))) +#define payloadKeyOffset(r) (*(uint16_t *)POINTER_SHIFT(r, PAYLOAD_HEADER_LEN + PAYLOAD_ID_TYPE_LEN)) +#define payloadTKey(r) (*(TKEY *)POINTER_SHIFT(r, payloadValuesOffset(r) + payloadKeyOffset(r))) #define payloadKey(r) tdGetKey(payloadTKey(r)) -static FORCE_INLINE char *skipToNextEles(char *p) { - uint8_t colType = payloadColType(p); - if (IS_VAR_DATA_TYPE(colType)) { - return (char *)POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + varDataTLen(payloadColValue(p))); - } else { - return (char *)POINTER_SHIFT(p, PAYLOAD_ID_TYPE_LEN + TYPE_BYTES[colType]); - } -} +static FORCE_INLINE char *payloadNextCol(char *pCol) { return (char *)POINTER_SHIFT(pCol, PAYLOAD_COL_HEAD_LEN); } #ifdef __cplusplus } #endif -#endif // _TD_DATA_FORMAT_H_ +#endif // _TD_DATA_FORMAT_H_ \ No newline at end of file diff --git a/src/inc/taosdef.h b/src/inc/taosdef.h index 6fa9a41f1f..9ab2542eb9 100644 --- a/src/inc/taosdef.h +++ b/src/inc/taosdef.h @@ -195,7 +195,11 @@ do { \ #define TSDB_APPNAME_LEN TSDB_UNI_LEN -#define TSDB_MAX_BYTES_PER_ROW 65536 + /** + * Don't change to 65536. As in some scenarios uint16_t (0~65535) is used to store the row len. + * Finally, we use 65531(65535 - 4), as the SDataRow and SKVRow including 4 bits header. + */ +#define TSDB_MAX_BYTES_PER_ROW 65531 #define TSDB_MAX_TAGS_LEN 16384 #define TSDB_MAX_TAGS 128 #define TSDB_MAX_TAG_CONDITIONS 1024 diff --git a/src/inc/ttype.h b/src/inc/ttype.h index 0900f45350..2581589563 100644 --- a/src/inc/ttype.h +++ b/src/inc/ttype.h @@ -10,9 +10,10 @@ extern "C" { #include "taosdef.h" // ----------------- For variable data types such as TSDB_DATA_TYPE_BINARY and TSDB_DATA_TYPE_NCHAR -typedef int32_t VarDataOffsetT; -typedef int16_t VarDataLenT; // maxVarDataLen: 32767 -typedef uint16_t TDRowLenT; +typedef int32_t VarDataOffsetT; +typedef int16_t VarDataLenT; // maxVarDataLen: 32767 +typedef uint16_t TDRowLenT; // not including overhead: 0 ~ 65535 +typedef uint32_t TDRowTLenT; // total length, including overhead typedef struct tstr { VarDataLenT len; diff --git a/src/tsdb/src/tsdbMemTable.c b/src/tsdb/src/tsdbMemTable.c index ad097ec0f5..d62595d96f 100644 --- a/src/tsdb/src/tsdbMemTable.c +++ b/src/tsdb/src/tsdbMemTable.c @@ -767,7 +767,7 @@ static int tsdbCopyRowToMem(STsdbRepo *pRepo, SMemRow row, STable *pTable, void void *pRow = tsdbAllocBytes(pRepo, memRowTLen(row)); if (pRow == NULL) { - tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %d bytes since %s", + tsdbError("vgId:%d failed to insert row with key %" PRId64 " to table %s while allocate %" PRIu64 " bytes since %s", REPO_ID(pRepo), key, TABLE_CHAR_NAME(pTable), memRowTLen(row), tstrerror(terrno)); return -1; }