From 1c0e89de2bbcdbcf0ecb2f388d2b35d4917bb93c Mon Sep 17 00:00:00 2001 From: Cary Xu Date: Wed, 18 Aug 2021 14:52:05 +0800 Subject: [PATCH] enhance performacne when inserting from csv --- src/client/inc/tsclient.h | 8 +- src/client/src/tscParseInsert.c | 384 ++++++++++++++++++++++++++++++-- src/client/src/tscUtil.c | 1 + 3 files changed, 375 insertions(+), 18 deletions(-) diff --git a/src/client/inc/tsclient.h b/src/client/inc/tsclient.h index 4ead7d4180..99ed082236 100644 --- a/src/client/inc/tsclient.h +++ b/src/client/inc/tsclient.h @@ -123,17 +123,15 @@ typedef struct { int32_t kvLen; // len of SKVRow } SMemRowInfo; typedef struct { - uint8_t memRowType; - uint8_t compareStat; // 0 unknown, 1 need compare, 2 no need - TDRowTLenT dataRowInitLen; + uint8_t memRowType; // default is 0, that is SDataRow + uint8_t compareStat; // 0 no need, 1 need compare TDRowTLenT kvRowInitLen; SMemRowInfo *rowInfo; } SMemRowBuilder; typedef enum { - ROW_COMPARE_UNKNOWN = 0, + ROW_COMPARE_NO_NEED = 0, ROW_COMPARE_NEED = 1, - ROW_COMPARE_NO_NEED = 2, } ERowCompareStat; int tsParseTime(SStrToken *pToken, int64_t *time, char **next, char *error, int16_t timePrec); diff --git a/src/client/src/tscParseInsert.c b/src/client/src/tscParseInsert.c index 89e3832007..dab0dff1fc 100644 --- a/src/client/src/tscParseInsert.c +++ b/src/client/src/tscParseInsert.c @@ -53,18 +53,18 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 if (nBoundCols == 0) { // file input pBuilder->memRowType = SMEM_ROW_DATA; - pBuilder->compareStat = ROW_COMPARE_NO_NEED; + // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } else { float boundRatio = ((float)nBoundCols / (float)nCols); if (boundRatio < KVRatioKV) { pBuilder->memRowType = SMEM_ROW_KV; - pBuilder->compareStat = ROW_COMPARE_NO_NEED; + // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } else if (boundRatio > KVRatioData) { pBuilder->memRowType = SMEM_ROW_DATA; - pBuilder->compareStat = ROW_COMPARE_NO_NEED; + // pBuilder->compareStat = ROW_COMPARE_NO_NEED; return TSDB_CODE_SUCCESS; } pBuilder->compareStat = ROW_COMPARE_NEED; @@ -76,7 +76,6 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } } - pBuilder->dataRowInitLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; pBuilder->kvRowInitLen = TD_MEM_ROW_KV_HEAD_SIZE + nBoundCols * sizeof(SColIdx); if (nRows > 0) { @@ -86,7 +85,7 @@ int initMemRowBuilder(SMemRowBuilder *pBuilder, uint32_t nRows, uint3 } for (int i = 0; i < nRows; ++i) { - (pBuilder->rowInfo + i)->dataLen = pBuilder->dataRowInitLen; + (pBuilder->rowInfo + i)->dataLen = TD_MEM_ROW_DATA_HEAD_SIZE + allNullLen; (pBuilder->rowInfo + i)->kvLen = pBuilder->kvRowInitLen; } } @@ -449,6 +448,370 @@ int32_t tsCheckTimestamp(STableDataBlocks *pDataBlocks, const char *start) { return TSDB_CODE_SUCCESS; } +static int32_t tsParseOneColumnOld(SSchema *pSchema, SStrToken *pToken, char *payload, char *msg, char **str, + bool primaryKey, int16_t timePrec) { + int64_t iv; + int32_t ret; + char * endptr = NULL; + + if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { + return tscInvalidOperationMsg(msg, "invalid numeric data", pToken->z); + } + + switch (pSchema->type) { + case TSDB_DATA_TYPE_BOOL: { // bool + if (isNullStr(pToken)) { + *((uint8_t *)payload) = TSDB_DATA_BOOL_NULL; + } else { + if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { + if (strncmp(pToken->z, "true", pToken->n) == 0) { + *(uint8_t *)payload = TSDB_TRUE; + } else if (strncmp(pToken->z, "false", pToken->n) == 0) { + *(uint8_t *)payload = TSDB_FALSE; + } else { + return tscSQLSyntaxErrMsg(msg, "invalid bool data", pToken->z); + } + } else if (pToken->type == TK_INTEGER) { + iv = strtoll(pToken->z, NULL, 10); + *(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE); + } else if (pToken->type == TK_FLOAT) { + double dv = strtod(pToken->z, NULL); + *(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE); + } else { + return tscInvalidOperationMsg(msg, "invalid bool data", pToken->z); + } + } + break; + } + + case TSDB_DATA_TYPE_TINYINT: + if (isNullStr(pToken)) { + *((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid tinyint data", pToken->z); + } else if (!IS_VALID_TINYINT(iv)) { + return tscInvalidOperationMsg(msg, "data overflow", pToken->z); + } + + *((uint8_t *)payload) = (uint8_t)iv; + } + + break; + + case TSDB_DATA_TYPE_UTINYINT: + if (isNullStr(pToken)) { + *((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned tinyint data", pToken->z); + } else if (!IS_VALID_UTINYINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned tinyint data overflow", pToken->z); + } + + *((uint8_t *)payload) = (uint8_t)iv; + } + + break; + + case TSDB_DATA_TYPE_SMALLINT: + if (isNullStr(pToken)) { + *((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid smallint data", pToken->z); + } else if (!IS_VALID_SMALLINT(iv)) { + return tscInvalidOperationMsg(msg, "smallint data overflow", pToken->z); + } + + *((int16_t *)payload) = (int16_t)iv; + } + + break; + + case TSDB_DATA_TYPE_USMALLINT: + if (isNullStr(pToken)) { + *((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned smallint data", pToken->z); + } else if (!IS_VALID_USMALLINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned smallint data overflow", pToken->z); + } + + *((uint16_t *)payload) = (uint16_t)iv; + } + + break; + + case TSDB_DATA_TYPE_INT: + if (isNullStr(pToken)) { + *((int32_t *)payload) = TSDB_DATA_INT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid int data", pToken->z); + } else if (!IS_VALID_INT(iv)) { + return tscInvalidOperationMsg(msg, "int data overflow", pToken->z); + } + + *((int32_t *)payload) = (int32_t)iv; + } + + break; + + case TSDB_DATA_TYPE_UINT: + if (isNullStr(pToken)) { + *((uint32_t *)payload) = TSDB_DATA_UINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned int data", pToken->z); + } else if (!IS_VALID_UINT(iv)) { + return tscInvalidOperationMsg(msg, "unsigned int data overflow", pToken->z); + } + + *((uint32_t *)payload) = (uint32_t)iv; + } + + break; + + case TSDB_DATA_TYPE_BIGINT: + if (isNullStr(pToken)) { + *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, true); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid bigint data", pToken->z); + } else if (!IS_VALID_BIGINT(iv)) { + return tscInvalidOperationMsg(msg, "bigint data overflow", pToken->z); + } + + *((int64_t *)payload) = iv; + } + break; + + case TSDB_DATA_TYPE_UBIGINT: + if (isNullStr(pToken)) { + *((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL; + } else { + ret = tStrToInteger(pToken->z, pToken->type, pToken->n, &iv, false); + if (ret != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid unsigned bigint data", pToken->z); + } else if (!IS_VALID_UBIGINT((uint64_t)iv)) { + return tscInvalidOperationMsg(msg, "unsigned bigint data overflow", pToken->z); + } + + *((uint64_t *)payload) = iv; + } + break; + + case TSDB_DATA_TYPE_FLOAT: + if (isNullStr(pToken)) { + *((int32_t *)payload) = TSDB_DATA_FLOAT_NULL; + } else { + double dv; + if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { + return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); + } + + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || + isnan(dv)) { + return tscInvalidOperationMsg(msg, "illegal float data", pToken->z); + } + + // *((float *)payload) = (float)dv; + SET_FLOAT_VAL(payload, dv); + } + break; + + case TSDB_DATA_TYPE_DOUBLE: + if (isNullStr(pToken)) { + *((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL; + } else { + double dv; + if (TK_ILLEGAL == tscToDouble(pToken, &dv, &endptr)) { + return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); + } + + if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) { + return tscInvalidOperationMsg(msg, "illegal double data", pToken->z); + } + + *((double *)payload) = dv; + } + break; + + case TSDB_DATA_TYPE_BINARY: + // binary data cannot be null-terminated char string, otherwise the last char of the string is lost + if (pToken->type == TK_NULL) { + setVardataNull(payload, TSDB_DATA_TYPE_BINARY); + } else { // too long values will return invalid sql, not be truncated automatically + if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor + return tscInvalidOperationMsg(msg, "string data overflow", pToken->z); + } + + STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n); + } + + break; + + case TSDB_DATA_TYPE_NCHAR: + if (pToken->type == TK_NULL) { + setVardataNull(payload, TSDB_DATA_TYPE_NCHAR); + } else { + // if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long' + int32_t output = 0; + if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) { + char buf[512] = {0}; + snprintf(buf, tListLen(buf), "%s", strerror(errno)); + return tscInvalidOperationMsg(msg, buf, pToken->z); + } + + varDataSetLen(payload, output); + } + break; + + case TSDB_DATA_TYPE_TIMESTAMP: { + if (pToken->type == TK_NULL) { + if (primaryKey) { + *((int64_t *)payload) = 0; + } else { + *((int64_t *)payload) = TSDB_DATA_BIGINT_NULL; + } + } else { + int64_t temp; + if (tsParseTime(pToken, &temp, str, msg, timePrec) != TSDB_CODE_SUCCESS) { + return tscInvalidOperationMsg(msg, "invalid timestamp", pToken->z); + } + + *((int64_t *)payload) = temp; + } + + break; + } + } + + return TSDB_CODE_SUCCESS; +} + +static int tsParseOneRowOld(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, + char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { + int32_t index = 0; + SStrToken sToken = {0}; + char * payload = pDataBlocks->pData + pDataBlocks->size; + + SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo; + SSchema * schema = tscGetTableSchema(pDataBlocks->pTableMeta); + + // 1. set the parsed value from sql string + int32_t rowSize = 0; + for (int i = 0; i < spd->numOfBound; ++i) { + // the start position in data block buffer of current value in sql + int32_t colIndex = spd->boundedColumns[i]; + + char * start = payload + spd->cols[colIndex].offset; + SSchema *pSchema = &schema[colIndex]; + rowSize += pSchema->bytes; + + index = 0; + sToken = tStrGetToken(*str, &index, true); + *str += index; + + if (sToken.type == TK_QUESTION) { + if (pInsertParam->insertType != TSDB_QUERY_TYPE_STMT_INSERT) { + return tscSQLSyntaxErrMsg(pInsertParam->msg, "? only allowed in binding insertion", *str); + } + + uint32_t offset = (uint32_t)(start - pDataBlocks->pData); + if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) { + continue; + } + + strcpy(pInsertParam->msg, "client out of memory"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + + int16_t type = sToken.type; + if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL && + type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) || + (sToken.n == 0) || (type == TK_RP)) { + return tscSQLSyntaxErrMsg(pInsertParam->msg, "invalid data or symbol", sToken.z); + } + + // Remove quotation marks + if (TK_STRING == sToken.type) { + // delete escape character: \\, \', \" + char delim = sToken.z[0]; + + int32_t cnt = 0; + int32_t j = 0; + if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) { + return tscSQLSyntaxErrMsg(pInsertParam->msg, "too long string", sToken.z); + } + + for (uint32_t k = 1; k < sToken.n - 1; ++k) { + if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) { + tmpTokenBuf[j] = sToken.z[k + 1]; + + cnt++; + j++; + k++; + continue; + } + + tmpTokenBuf[j] = sToken.z[k]; + j++; + } + + tmpTokenBuf[j] = 0; + sToken.z = tmpTokenBuf; + sToken.n -= 2 + cnt; + } + + bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_INDEX); + int32_t ret = tsParseOneColumnOld(pSchema, &sToken, start, pInsertParam->msg, str, isPrimaryKey, timePrec); + if (ret != TSDB_CODE_SUCCESS) { + return ret; + } + + if (isPrimaryKey && tsCheckTimestamp(pDataBlocks, start) != TSDB_CODE_SUCCESS) { + tscInvalidOperationMsg(pInsertParam->msg, "client time/server time can not be mixed up", sToken.z); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; + } + } + + // 2. set the null value for the columns that do not assign values + if (spd->numOfBound < spd->numOfCols) { + char *ptr = payload; + + for (int32_t i = 0; i < spd->numOfCols; ++i) { + if (spd->cols[i].valStat == VAL_STAT_NONE) { // current column do not have any value to insert, set it to null + if (schema[i].type == TSDB_DATA_TYPE_BINARY) { + varDataSetLen(ptr, sizeof(int8_t)); + *(uint8_t *)varDataVal(ptr) = TSDB_DATA_BINARY_NULL; + } else if (schema[i].type == TSDB_DATA_TYPE_NCHAR) { + varDataSetLen(ptr, sizeof(int32_t)); + *(uint32_t *)varDataVal(ptr) = TSDB_DATA_NCHAR_NULL; + } else { + setNull(ptr, schema[i].type, schema[i].bytes); + } + } + + ptr += schema[i].bytes; + } + + rowSize = (int32_t)(ptr - payload); + } + + *len = rowSize; + return TSDB_CODE_SUCCESS; +} + int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, int32_t *len, char *tmpTokenBuf, SInsertStatementParam *pInsertParam) { int32_t index = 0; @@ -460,7 +823,7 @@ int tsParseOneRow(char **str, STableDataBlocks *pDataBlocks, int16_t timePrec, i STableMeta * pTableMeta = pDataBlocks->pTableMeta; SSchema * schema = tscGetTableSchema(pTableMeta); SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder; - int32_t dataLen = pBuilder->dataRowInitLen; + int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE; int32_t kvLen = pBuilder->kvRowInitLen; bool isParseBindParam = false; @@ -1698,6 +2061,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow STableComInfo tinfo = tscGetTableInfo(pTableMeta); SInsertStatementParam* pInsertParam = &pCmd->insertParam; + pInsertParam->payloadType = PAYLOAD_TYPE_RAW; destroyTableNameList(pInsertParam); pInsertParam->pDataBlocks = tscDestroyBlockArrayList(pInsertParam->pDataBlocks); @@ -1726,12 +2090,6 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow goto _error; } - if (TSDB_CODE_SUCCESS != - (ret = initMemRowBuilder(&pTableDataBlock->rowBuilder, 0, tinfo.numOfColumns, pTableDataBlock->numOfParams, - pTableDataBlock->boundColumnInfo.allNullLen))) { - goto _error; - } - while ((readLen = tgetline(&line, &n, fp)) != -1) { if (('\r' == line[readLen - 1]) || ('\n' == line[readLen - 1])) { line[--readLen] = 0; @@ -1745,7 +2103,7 @@ static void parseFileSendDataBlock(void *param, TAOS_RES *tres, int32_t numOfRow strtolower(line, line); int32_t len = 0; - code = tsParseOneRow(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam); + code = tsParseOneRowOld(&lineptr, pTableDataBlock, tinfo.precision, &len, tokenBuf, pInsertParam); if (code != TSDB_CODE_SUCCESS || pTableDataBlock->numOfParams > 0) { pSql->res.code = code; break; diff --git a/src/client/src/tscUtil.c b/src/client/src/tscUtil.c index 612d6a5642..f3e30172ab 100644 --- a/src/client/src/tscUtil.c +++ b/src/client/src/tscUtil.c @@ -1744,6 +1744,7 @@ int32_t tscCreateDataBlock(size_t defaultSize, int32_t rowSize, int32_t startOff dataBuf->tsSource = -1; dataBuf->vgId = dataBuf->pTableMeta->vgId; + tNameAssign(&dataBuf->tableName, name); assert(defaultSize > 0 && pTableMeta != NULL && dataBuf->pTableMeta != NULL);