From 1c0e89de2bbcdbcf0ecb2f388d2b35d4917bb93c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 18 Aug 2021 14:52:05 +0800 Subject: [PATCH 1/3] enhance performacne when inserting from csv --- src/client/inc/tsclient.h | 8 +- src/client/src/tscParseInsert.c | 384 ++++++++++++++++++++++++++++++-- src/client/src/tscUtil.c | 1 + 3 files changed, 375 insertions(+), 18 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4ead7d4180..99ed082236 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -123,17 +123,15 @@ typedef struct { int32_t kvLen; // len of SKVRow } SMemRowInfo; typedef struct { - uint8_t memRowType; - uint8_t compareStat; // 0 unknown, 1 need compare, 2 no need - TDRowTLenT dataRowInitLen; + uint8_t memRowType; // default is 0, that is SDataRow + uint8_t compareStat; // 0 no need, 1 need compare TDRowTLenT kvRowInitLen; SMemRowInfo *rowInfo; } SMemRowBuilder; typedef enum { - ROW_COMPARE_UNKNOWN = 0, + ROW_COMPARE_NO_NEED = 0, ROW_COMPARE_NEED = 1, - ROW_COMPARE_NO_NEED = 2, } ERowCompareStat; int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 89e3832007..dab0dff1fc 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -53,18 +53,18 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 if (nBoundCols == 0) { // file input pBuilder->memRowType = SMEM_ROW_DATA; - pBuilder->compareStat = ROW_COMPARE_NO_NEED; + // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } else { float boundRatio = ((float)nBoundCols / (float)nCols); if (boundRatio < KVRatioKV) { pBuilder->memRowType = SMEM_ROW_KV; - pBuilder->compareStat = ROW_COMPARE_NO_NEED; + // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } else if (boundRatio > KVRatioData) { pBuilder->memRowType = SMEM_ROW_DATA; - pBuilder->compareStat = ROW_COMPARE_NO_NEED; + // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } pBuilder->compareStat = ROW_COMPARE_NEED; @@ -76,7 +76,6 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } } - pBuilder->dataRowInitLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx); if (nRows > 0) { @@ -86,7 +85,7 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } for (int i = 0; i < nRows; ++i) { - (pBuilder->rowInfo + i)->dataLen = pBuilder->dataRowInitLen; + (pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; (pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen; } } @@ -449,6 +448,370 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) { return TSDB_CODE_SUCCESS; } +static int32_t tsParseOneColumnOld(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, + bool primaryKey, int16_t timePrec) { + int64_t iv; + int32_t ret; + char * endptr = NULL; + + if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { + return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z); + } + + switch (pSchema->type) { + case TSDB_DATA_TYPE_BOOL: { // bool + if (isNullStr(pToken)) { + *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL; + } else { + if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { + if (strncmp(pToken->z, "true", pToken->n) == 0) { + *(uint8_t *)payload = TSDB_TRUE; + } else if (strncmp(pToken->z, "false", pToken->n) == 0) { + *(uint8_t *)payload = TSDB_FALSE; + } else { + return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z); + } + } else if (pToken->type == TK_INTEGER) { + iv = strtoll(pToken->z, NULL, 10); + *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE); + } else if (pToken->type == TK_FLOAT) { + double dv = strtod(pToken->z, NULL); + *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE); + } else { + return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z); + } + } + break; + } + + case TSDB_DATA_TYPE_TINYINT: + if (isNullStr(pToken)) { + *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z); + } else if (!IS_VALID_TINYINT(iv)) { + return tscInvalidOperationMsg(msg, "data overflow", pToken->z); + } + + *((uint8_t *)payload) = (uint8_t)iv; + } + + break; + + case TSDB_DATA_TYPE_UTINYINT: + if (isNullStr(pToken)) { + *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z); + } else if (!IS_VALID_UTINYINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z); + } + + *((uint8_t *)payload) = (uint8_t)iv; + } + + break; + + case TSDB_DATA_TYPE_SMALLINT: + if (isNullStr(pToken)) { + *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z); + } else if (!IS_VALID_SMALLINT(iv)) { + return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z); + } + + *((int16_t *)payload) = (int16_t)iv; + } + + break; + + case TSDB_DATA_TYPE_USMALLINT: + if (isNullStr(pToken)) { + *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z); + } else if (!IS_VALID_USMALLINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z); + } + + *((uint16_t *)payload) = (uint16_t)iv; + } + + break; + + case TSDB_DATA_TYPE_INT: + if (isNullStr(pToken)) { + *((int32_t *)payload) = TSDB_DATA_INT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid int data", pToken->z); + } else if (!IS_VALID_INT(iv)) { + return tscInvalidOperationMsg(msg, "int data overflow", pToken->z); + } + + *((int32_t *)payload) = (int32_t)iv; + } + + break; + + case TSDB_DATA_TYPE_UINT: + if (isNullStr(pToken)) { + *((uint32_t *)payload) = TSDB_DATA_UINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z); + } else if (!IS_VALID_UINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z); + } + + *((uint32_t *)payload) = (uint32_t)iv; + } + + break; + + case TSDB_DATA_TYPE_BIGINT: + if (isNullStr(pToken)) { + *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z); + } else if (!IS_VALID_BIGINT(iv)) { + return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); + } + + *((int64_t *)payload) = iv; + } + break; + + case TSDB_DATA_TYPE_UBIGINT: + if (isNullStr(pToken)) { + *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z); + } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { + return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z); + } + + *((uint64_t *)payload) = iv; + } + break; + + case TSDB_DATA_TYPE_FLOAT: + if (isNullStr(pToken)) { + *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL; + } else { + double dv; + if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { + return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); + } + + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || + isnan(dv)) { + return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); + } + + // *((float *)payload) = (float)dv; + SET_FLOAT_VAL(payload, dv); + } + break; + + case TSDB_DATA_TYPE_DOUBLE: + if (isNullStr(pToken)) { + *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL; + } else { + double dv; + if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { + return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); + } + + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { + return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); + } + + *((double *)payload) = dv; + } + break; + + case TSDB_DATA_TYPE_BINARY: + // binary data cannot be null-terminated char string, otherwise the last char of the string is lost + if (pToken->type == TK_NULL) { + setVardataNull(payload, TSDB_DATA_TYPE_BINARY); + } else { // too long values will return invalid sql, not be truncated automatically + if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor + return tscInvalidOperationMsg(msg, "string data overflow", pToken->z); + } + + STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); + } + + break; + + case TSDB_DATA_TYPE_NCHAR: + if (pToken->type == TK_NULL) { + setVardataNull(payload, TSDB_DATA_TYPE_NCHAR); + } else { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { + char buf[512] = {0}; + snprintf(buf, tListLen(buf), "%s", strerror(errno)); + return tscInvalidOperationMsg(msg, buf, pToken->z); + } + + varDataSetLen(payload, output); + } + break; + + case TSDB_DATA_TYPE_TIMESTAMP: { + if (pToken->type == TK_NULL) { + if (primaryKey) { + *((int64_t *)payload) = 0; + } else { + *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; + } + } else { + int64_t temp; + if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); + } + + *((int64_t *)payload) = temp; + } + + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int tsParseOneRowOld(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, + char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { + int32_t index = 0; + SStrToken sToken = {0}; + char * payload = pDataBlocks->pData + pDataBlocks->size; + + SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; + SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta); + + // 1. set the parsed value from sql string + int32_t rowSize = 0; + for (int i = 0; i < spd->numOfBound; ++i) { + // the start position in data block buffer of current value in sql + int32_t colIndex = spd->boundedColumns[i]; + + char * start = payload + spd->cols[colIndex].offset; + SSchema *pSchema = &schema[colIndex]; + rowSize += pSchema->bytes; + + index = 0; + sToken = tStrGetToken(*str, &index, true); + *str += index; + + if (sToken.type == TK_QUESTION) { + if (pInsertParam->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { + return tscSQLSyntaxErrMsg(pInsertParam->msg, "? only allowed in binding insertion", *str); + } + + uint32_t offset = (uint32_t)(start - pDataBlocks->pData); + if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) { + continue; + } + + strcpy(pInsertParam->msg, "client out of memory"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int16_t type = sToken.type; + if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && + type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || + (sToken.n == 0) || (type == TK_RP)) { + return tscSQLSyntaxErrMsg(pInsertParam->msg, "invalid data or symbol", sToken.z); + } + + // Remove quotation marks + if (TK_STRING == sToken.type) { + // delete escape character: \\, \', \" + char delim = sToken.z[0]; + + int32_t cnt = 0; + int32_t j = 0; + if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) { + return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z); + } + + for (uint32_t k = 1; k < sToken.n - 1; ++k) { + if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) { + tmpTokenBuf[j] = sToken.z[k + 1]; + + cnt++; + j++; + k++; + continue; + } + + tmpTokenBuf[j] = sToken.z[k]; + j++; + } + + tmpTokenBuf[j] = 0; + sToken.z = tmpTokenBuf; + sToken.n -= 2 + cnt; + } + + bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); + int32_t ret = tsParseOneColumnOld(pSchema, &sToken, start, pInsertParam->msg, str, isPrimaryKey, timePrec); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + + if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { + tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; + } + } + + // 2. set the null value for the columns that do not assign values + if (spd->numOfBound < spd->numOfCols) { + char *ptr = payload; + + for (int32_t i = 0; i < spd->numOfCols; ++i) { + if (spd->cols[i].valStat == VAL_STAT_NONE) { // current column do not have any value to insert, set it to null + if (schema[i].type == TSDB_DATA_TYPE_BINARY) { + varDataSetLen(ptr, sizeof(int8_t)); + *(uint8_t *)varDataVal(ptr) = TSDB_DATA_BINARY_NULL; + } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) { + varDataSetLen(ptr, sizeof(int32_t)); + *(uint32_t *)varDataVal(ptr) = TSDB_DATA_NCHAR_NULL; + } else { + setNull(ptr, schema[i].type, schema[i].bytes); + } + } + + ptr += schema[i].bytes; + } + + rowSize = (int32_t)(ptr - payload); + } + + *len = rowSize; + return TSDB_CODE_SUCCESS; +} + int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { int32_t index = 0; @@ -460,7 +823,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i STableMeta * pTableMeta = pDataBlocks->pTableMeta; SSchema * schema = tscGetTableSchema(pTableMeta); SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder; - int32_t dataLen = pBuilder->dataRowInitLen; + int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE; int32_t kvLen = pBuilder->kvRowInitLen; bool isParseBindParam = false; @@ -1698,6 +2061,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow STableComInfo tinfo = tscGetTableInfo(pTableMeta); SInsertStatementParam* pInsertParam = &pCmd->insertParam; + pInsertParam->payloadType = PAYLOAD_TYPE_RAW; destroyTableNameList(pInsertParam); pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks); @@ -1726,12 +2090,6 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow goto _error; } - if (TSDB_CODE_SUCCESS != - (ret = initMemRowBuilder(&pTableDataBlock->rowBuilder, 0, tinfo.numOfColumns, pTableDataBlock->numOfParams, - pTableDataBlock->boundColumnInfo.allNullLen))) { - goto _error; - } - while ((readLen = tgetline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { line[--readLen] = 0; @@ -1745,7 +2103,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow strtolower(line, line); int32_t len = 0; - code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam); + code = tsParseOneRowOld(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam); if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; break; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 612d6a5642..f3e30172ab 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1744,6 +1744,7 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff dataBuf->tsSource = -1; dataBuf->vgId = dataBuf->pTableMeta->vgId; + tNameAssign(&dataBuf->tableName, name); assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL); From 3155e72501e69d7ff3936cdf4927cd5e9215b262 Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 18 Aug 2021 19:43:44 +0800 Subject: [PATCH 2/3] enhance --- src/client/src/tscParseInsert.c | 398 +++----------------------------- 1 file changed, 29 insertions(+), 369 deletions(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index dab0dff1fc..793cee4ca2 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -448,370 +448,6 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) { return TSDB_CODE_SUCCESS; } -static int32_t tsParseOneColumnOld(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, - bool primaryKey, int16_t timePrec) { - int64_t iv; - int32_t ret; - char * endptr = NULL; - - if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { - return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z); - } - - switch (pSchema->type) { - case TSDB_DATA_TYPE_BOOL: { // bool - if (isNullStr(pToken)) { - *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL; - } else { - if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { - if (strncmp(pToken->z, "true", pToken->n) == 0) { - *(uint8_t *)payload = TSDB_TRUE; - } else if (strncmp(pToken->z, "false", pToken->n) == 0) { - *(uint8_t *)payload = TSDB_FALSE; - } else { - return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z); - } - } else if (pToken->type == TK_INTEGER) { - iv = strtoll(pToken->z, NULL, 10); - *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE); - } else if (pToken->type == TK_FLOAT) { - double dv = strtod(pToken->z, NULL); - *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE); - } else { - return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z); - } - } - break; - } - - case TSDB_DATA_TYPE_TINYINT: - if (isNullStr(pToken)) { - *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z); - } else if (!IS_VALID_TINYINT(iv)) { - return tscInvalidOperationMsg(msg, "data overflow", pToken->z); - } - - *((uint8_t *)payload) = (uint8_t)iv; - } - - break; - - case TSDB_DATA_TYPE_UTINYINT: - if (isNullStr(pToken)) { - *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z); - } else if (!IS_VALID_UTINYINT(iv)) { - return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z); - } - - *((uint8_t *)payload) = (uint8_t)iv; - } - - break; - - case TSDB_DATA_TYPE_SMALLINT: - if (isNullStr(pToken)) { - *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z); - } else if (!IS_VALID_SMALLINT(iv)) { - return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z); - } - - *((int16_t *)payload) = (int16_t)iv; - } - - break; - - case TSDB_DATA_TYPE_USMALLINT: - if (isNullStr(pToken)) { - *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z); - } else if (!IS_VALID_USMALLINT(iv)) { - return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z); - } - - *((uint16_t *)payload) = (uint16_t)iv; - } - - break; - - case TSDB_DATA_TYPE_INT: - if (isNullStr(pToken)) { - *((int32_t *)payload) = TSDB_DATA_INT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid int data", pToken->z); - } else if (!IS_VALID_INT(iv)) { - return tscInvalidOperationMsg(msg, "int data overflow", pToken->z); - } - - *((int32_t *)payload) = (int32_t)iv; - } - - break; - - case TSDB_DATA_TYPE_UINT: - if (isNullStr(pToken)) { - *((uint32_t *)payload) = TSDB_DATA_UINT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z); - } else if (!IS_VALID_UINT(iv)) { - return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z); - } - - *((uint32_t *)payload) = (uint32_t)iv; - } - - break; - - case TSDB_DATA_TYPE_BIGINT: - if (isNullStr(pToken)) { - *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z); - } else if (!IS_VALID_BIGINT(iv)) { - return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); - } - - *((int64_t *)payload) = iv; - } - break; - - case TSDB_DATA_TYPE_UBIGINT: - if (isNullStr(pToken)) { - *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL; - } else { - ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); - if (ret != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z); - } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { - return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z); - } - - *((uint64_t *)payload) = iv; - } - break; - - case TSDB_DATA_TYPE_FLOAT: - if (isNullStr(pToken)) { - *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL; - } else { - double dv; - if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { - return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); - } - - if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || - isnan(dv)) { - return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); - } - - // *((float *)payload) = (float)dv; - SET_FLOAT_VAL(payload, dv); - } - break; - - case TSDB_DATA_TYPE_DOUBLE: - if (isNullStr(pToken)) { - *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL; - } else { - double dv; - if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { - return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); - } - - if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { - return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); - } - - *((double *)payload) = dv; - } - break; - - case TSDB_DATA_TYPE_BINARY: - // binary data cannot be null-terminated char string, otherwise the last char of the string is lost - if (pToken->type == TK_NULL) { - setVardataNull(payload, TSDB_DATA_TYPE_BINARY); - } else { // too long values will return invalid sql, not be truncated automatically - if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor - return tscInvalidOperationMsg(msg, "string data overflow", pToken->z); - } - - STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); - } - - break; - - case TSDB_DATA_TYPE_NCHAR: - if (pToken->type == TK_NULL) { - setVardataNull(payload, TSDB_DATA_TYPE_NCHAR); - } else { - // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' - int32_t output = 0; - if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { - char buf[512] = {0}; - snprintf(buf, tListLen(buf), "%s", strerror(errno)); - return tscInvalidOperationMsg(msg, buf, pToken->z); - } - - varDataSetLen(payload, output); - } - break; - - case TSDB_DATA_TYPE_TIMESTAMP: { - if (pToken->type == TK_NULL) { - if (primaryKey) { - *((int64_t *)payload) = 0; - } else { - *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; - } - } else { - int64_t temp; - if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) { - return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); - } - - *((int64_t *)payload) = temp; - } - - break; - } - } - - return TSDB_CODE_SUCCESS; -} - -static int tsParseOneRowOld(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, - char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { - int32_t index = 0; - SStrToken sToken = {0}; - char * payload = pDataBlocks->pData + pDataBlocks->size; - - SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; - SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta); - - // 1. set the parsed value from sql string - int32_t rowSize = 0; - for (int i = 0; i < spd->numOfBound; ++i) { - // the start position in data block buffer of current value in sql - int32_t colIndex = spd->boundedColumns[i]; - - char * start = payload + spd->cols[colIndex].offset; - SSchema *pSchema = &schema[colIndex]; - rowSize += pSchema->bytes; - - index = 0; - sToken = tStrGetToken(*str, &index, true); - *str += index; - - if (sToken.type == TK_QUESTION) { - if (pInsertParam->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { - return tscSQLSyntaxErrMsg(pInsertParam->msg, "? only allowed in binding insertion", *str); - } - - uint32_t offset = (uint32_t)(start - pDataBlocks->pData); - if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) { - continue; - } - - strcpy(pInsertParam->msg, "client out of memory"); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - int16_t type = sToken.type; - if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && - type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || - (sToken.n == 0) || (type == TK_RP)) { - return tscSQLSyntaxErrMsg(pInsertParam->msg, "invalid data or symbol", sToken.z); - } - - // Remove quotation marks - if (TK_STRING == sToken.type) { - // delete escape character: \\, \', \" - char delim = sToken.z[0]; - - int32_t cnt = 0; - int32_t j = 0; - if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) { - return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z); - } - - for (uint32_t k = 1; k < sToken.n - 1; ++k) { - if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) { - tmpTokenBuf[j] = sToken.z[k + 1]; - - cnt++; - j++; - k++; - continue; - } - - tmpTokenBuf[j] = sToken.z[k]; - j++; - } - - tmpTokenBuf[j] = 0; - sToken.z = tmpTokenBuf; - sToken.n -= 2 + cnt; - } - - bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); - int32_t ret = tsParseOneColumnOld(pSchema, &sToken, start, pInsertParam->msg, str, isPrimaryKey, timePrec); - if (ret != TSDB_CODE_SUCCESS) { - return ret; - } - - if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { - tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } - } - - // 2. set the null value for the columns that do not assign values - if (spd->numOfBound < spd->numOfCols) { - char *ptr = payload; - - for (int32_t i = 0; i < spd->numOfCols; ++i) { - if (spd->cols[i].valStat == VAL_STAT_NONE) { // current column do not have any value to insert, set it to null - if (schema[i].type == TSDB_DATA_TYPE_BINARY) { - varDataSetLen(ptr, sizeof(int8_t)); - *(uint8_t *)varDataVal(ptr) = TSDB_DATA_BINARY_NULL; - } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) { - varDataSetLen(ptr, sizeof(int32_t)); - *(uint32_t *)varDataVal(ptr) = TSDB_DATA_NCHAR_NULL; - } else { - setNull(ptr, schema[i].type, schema[i].bytes); - } - } - - ptr += schema[i].bytes; - } - - rowSize = (int32_t)(ptr - payload); - } - - *len = rowSize; - return TSDB_CODE_SUCCESS; -} - int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { int32_t index = 0; @@ -1036,6 +672,7 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn } } +#if 1 void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) { pColInfo->numOfCols = numOfCols; pColInfo->numOfBound = numOfCols; @@ -1071,6 +708,30 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32 pColInfo->allNullLen += pColInfo->flen; pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT)); } +#endif + +#if 0 +void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) { + pColInfo->numOfCols = numOfCols; + pColInfo->numOfBound = numOfCols; + pColInfo->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode + + pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t)); + pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn)); + pColInfo->colIdxInfo = NULL; + pColInfo->flen = 0; + pColInfo->allNullLen = 0; + + for (int32_t i = 0; i < pColInfo->numOfCols; ++i) { + if (i > 0) { + pColInfo->cols[i].offset = pSchema[i - 1].bytes + pColInfo->cols[i - 1].offset; + } + + pColInfo->cols[i].valStat = VAL_STAT_HAS; + pColInfo->boundedColumns[i] = i; + } +} +#endif int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) { size_t remain = pDataBlock->nAllocSize - pDataBlock->size; @@ -1172,13 +833,12 @@ int tscSortRemoveDataBlockDupRows(STableDataBlocks *dataBuf, SBlockKeyInfo *pBlk // allocate memory size_t nAlloc = nRows * sizeof(SBlockKeyTuple); if (pBlkKeyInfo->pKeyTuple == NULL || pBlkKeyInfo->maxBytesAlloc < nAlloc) { - size_t nRealAlloc = nAlloc + 10 * sizeof(SBlockKeyTuple); - char * tmp = trealloc(pBlkKeyInfo->pKeyTuple, nRealAlloc); + char *tmp = trealloc(pBlkKeyInfo->pKeyTuple, nAlloc); if (tmp == NULL) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } pBlkKeyInfo->pKeyTuple = (SBlockKeyTuple *)tmp; - pBlkKeyInfo->maxBytesAlloc = (int32_t)nRealAlloc; + pBlkKeyInfo->maxBytesAlloc = (int32_t)nAlloc; } memset(pBlkKeyInfo->pKeyTuple, 0, nAlloc); @@ -2061,7 +1721,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow STableComInfo tinfo = tscGetTableInfo(pTableMeta); SInsertStatementParam* pInsertParam = &pCmd->insertParam; - pInsertParam->payloadType = PAYLOAD_TYPE_RAW; + // pInsertParam->payloadType = PAYLOAD_TYPE_RAW; destroyTableNameList(pInsertParam); pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks); @@ -2103,7 +1763,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow strtolower(line, line); int32_t len = 0; - code = tsParseOneRowOld(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam); + code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam); if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; break; From 2cddc07d2b95f9dc189de49dee5f7bd5d8289ebf Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 18 Aug 2021 19:48:30 +0800 Subject: [PATCH 3/3] [TD-6184]: optimize insert from imported file --- src/client/src/tscParseInsert.c | 32 ++------------------------------ src/client/src/tscUtil.c | 1 - 2 files changed, 2 insertions(+), 31 deletions(-) diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 793cee4ca2..f5d9a6a17e 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -51,20 +51,18 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } } + // default compareStat is ROW_COMPARE_NO_NEED if (nBoundCols == 0) { // file input pBuilder->memRowType = SMEM_ROW_DATA; - // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } else { float boundRatio = ((float)nBoundCols / (float)nCols); if (boundRatio < KVRatioKV) { pBuilder->memRowType = SMEM_ROW_KV; - // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } else if (boundRatio > KVRatioData) { pBuilder->memRowType = SMEM_ROW_DATA; - // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } pBuilder->compareStat = ROW_COMPARE_NEED; @@ -672,7 +670,6 @@ int32_t tsParseValues(char **str, STableDataBlocks *pDataBlock, int maxRows, SIn } } -#if 1 void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) { pColInfo->numOfCols = numOfCols; pColInfo->numOfBound = numOfCols; @@ -708,30 +705,6 @@ void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32 pColInfo->allNullLen += pColInfo->flen; pColInfo->extendedVarLen = (uint16_t)(nVar * sizeof(VarDataOffsetT)); } -#endif - -#if 0 -void tscSetBoundColumnInfo(SParsedDataColInfo *pColInfo, SSchema *pSchema, int32_t numOfCols) { - pColInfo->numOfCols = numOfCols; - pColInfo->numOfBound = numOfCols; - pColInfo->orderStatus = ORDER_STATUS_ORDERED; // default is ORDERED for non-bound mode - - pColInfo->boundedColumns = calloc(pColInfo->numOfCols, sizeof(int32_t)); - pColInfo->cols = calloc(pColInfo->numOfCols, sizeof(SBoundColumn)); - pColInfo->colIdxInfo = NULL; - pColInfo->flen = 0; - pColInfo->allNullLen = 0; - - for (int32_t i = 0; i < pColInfo->numOfCols; ++i) { - if (i > 0) { - pColInfo->cols[i].offset = pSchema[i - 1].bytes + pColInfo->cols[i - 1].offset; - } - - pColInfo->cols[i].valStat = VAL_STAT_HAS; - pColInfo->boundedColumns[i] = i; - } -} -#endif int32_t tscAllocateMemIfNeed(STableDataBlocks *pDataBlock, int32_t rowSize, int32_t * numOfRows) { size_t remain = pDataBlock->nAllocSize - pDataBlock->size; @@ -1720,8 +1693,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow STableMeta * pTableMeta = pTableMetaInfo->pTableMeta; STableComInfo tinfo = tscGetTableInfo(pTableMeta); - SInsertStatementParam* pInsertParam = &pCmd->insertParam; - // pInsertParam->payloadType = PAYLOAD_TYPE_RAW; + SInsertStatementParam *pInsertParam = &pCmd->insertParam; destroyTableNameList(pInsertParam); pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks); diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index f3e30172ab..612d6a5642 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1744,7 +1744,6 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff dataBuf->tsSource = -1; dataBuf->vgId = dataBuf->pTableMeta->vgId; - tNameAssign(&dataBuf->tableName, name); assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);