TD-11819 Parsing insert statement and assembling binary objects.

This commit is contained in:
Xiaoyu Wang 2021-12-11 00:35:26 -05:00
parent 0f5dc11095
commit 77b8bdd56b
4 changed files with 252 additions and 656 deletions

View File

@ -544,7 +544,7 @@ void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder); void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder); SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, void *value) { static FORCE_INLINE int tdAddColToKVRow(SKVRowBuilder *pBuilder, int16_t colId, int8_t type, const void *value) {
if (pBuilder->nCols >= pBuilder->tCols) { if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2; pBuilder->tCols *= 2;
SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols); SColIdx* pColIdx = (SColIdx *)realloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);

View File

@ -42,7 +42,7 @@ typedef enum ERowCompareStat {
typedef struct SBoundColumn { typedef struct SBoundColumn {
int32_t offset; // all column offset value int32_t offset; // all column offset value
int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future int32_t toffset; // first part offset for SDataRow TODO: get offset from STSchema on future
uint8_t valStat; // denote if current column bound or not(0 means has val, 1 means no val) uint8_t valStat; // EValStat. denote if current column bound or not(0 means has val, 1 means no val)
} SBoundColumn; } SBoundColumn;
typedef struct { typedef struct {
@ -63,15 +63,7 @@ typedef struct SParsedDataColInfo {
int8_t orderStatus; // bound columns int8_t orderStatus; // bound columns
} SParsedDataColInfo; } SParsedDataColInfo;
typedef struct SParamInfo { typedef struct SMemRowInfo {
int32_t idx;
uint8_t type;
uint8_t timePrec;
int16_t bytes;
uint32_t offset;
} SParamInfo;
typedef struct {
int32_t dataLen; // len of SDataRow int32_t dataLen; // len of SDataRow
int32_t kvLen; // len of SKVRow int32_t kvLen; // len of SKVRow
} SMemRowInfo; } SMemRowInfo;
@ -83,15 +75,13 @@ typedef struct {
SMemRowInfo *rowInfo; SMemRowInfo *rowInfo;
} SMemRowBuilder; } SMemRowBuilder;
typedef struct SBlockKeyTuple { typedef struct SParamInfo {
TSKEY skey; int32_t idx;
void* payloadAddr; uint8_t type;
} SBlockKeyTuple; uint8_t timePrec;
int16_t bytes;
typedef struct SBlockKeyInfo { uint32_t offset;
int32_t maxBytesAlloc; } SParamInfo;
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
typedef struct STableDataBlocks { typedef struct STableDataBlocks {
SName tableName; SName tableName;
@ -147,7 +137,7 @@ static FORCE_INLINE void appendMemRowColValEx(SMemRow row, const void *value, bo
} }
static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd, static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowType, SParsedDataColInfo *spd,
int32_t idx, int32_t *toffset, int16_t *colId) { int32_t idx, int32_t *toffset) {
int32_t schemaIdx = 0; int32_t schemaIdx = 0;
if (IS_DATA_COL_ORDERED(spd)) { if (IS_DATA_COL_ORDERED(spd)) {
schemaIdx = spd->boundedColumns[idx]; schemaIdx = spd->boundedColumns[idx];
@ -165,10 +155,9 @@ static FORCE_INLINE void getMemRowAppendInfo(SSchema *pSchema, uint8_t memRowTyp
*toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx); *toffset = ((spd->colIdxInfo + idx)->finalIdx) * sizeof(SColIdx);
} }
} }
*colId = pSchema[schemaIdx].colId;
} }
static FORCE_INLINE void checkAndConvertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) { static FORCE_INLINE void convertMemRow(SMemRow row, int32_t dataLen, int32_t kvLen) {
if (isDataRow(row)) { if (isDataRow(row)) {
if (kvLen < (dataLen * KVRatioConvert)) { if (kvLen < (dataLen * KVRatioConvert)) {
memRowSetConvert(row); memRowSetConvert(row);

View File

@ -15,20 +15,24 @@
#include "dataBlockMgt.h" #include "dataBlockMgt.h"
// #include "astGenerator.h"
// #include "parserInt.h"
#include "catalog.h" #include "catalog.h"
#include "parserUtil.h" #include "parserUtil.h"
#include "queryInfoUtil.h" #include "queryInfoUtil.h"
// #include "ttoken.h"
// #include "function.h"
// #include "ttime.h"
// #include "tglobal.h"
#include "taosmsg.h" #include "taosmsg.h"
#define IS_RAW_PAYLOAD(t) \ #define IS_RAW_PAYLOAD(t) \
(((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert (((int)(t)) == PAYLOAD_TYPE_RAW) // 0: K-V payload for non-prepare insert, 1: rawPayload for prepare insert
typedef struct SBlockKeyTuple {
TSKEY skey;
void* payloadAddr;
} SBlockKeyTuple;
typedef struct SBlockKeyInfo {
int32_t maxBytesAlloc;
SBlockKeyTuple* pKeyTuple;
} SBlockKeyInfo;
static int32_t rowDataCompar(const void *lhs, const void *rhs) { static int32_t rowDataCompar(const void *lhs, const void *rhs) {
TSKEY left = *(TSKEY *)lhs; TSKEY left = *(TSKEY *)lhs;
TSKEY right = *(TSKEY *)rhs; TSKEY right = *(TSKEY *)rhs;
@ -189,8 +193,7 @@ static int32_t getRowExpandSize(STableMeta* pTableMeta) {
* TODO: Move to tdataformat.h and refactor when STSchema available. * TODO: Move to tdataformat.h and refactor when STSchema available.
* - fetch flen and toffset from STSChema and remove param spd * - fetch flen and toffset from STSChema and remove param spd
*/ */
static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, SParsedDataColInfo *spd) {
SParsedDataColInfo *spd) {
ASSERT(isKvRow(src)); ASSERT(isKvRow(src));
SKVRow kvRow = memRowKvBody(src); SKVRow kvRow = memRowKvBody(src);
SDataRow dataRow = memRowDataBody(dest); SDataRow dataRow = memRowDataBody(dest);
@ -209,8 +212,7 @@ static FORCE_INLINE void convertToSDataRow(SMemRow dest, SMemRow src, SSchema *p
} }
// TODO: Move to tdataformat.h and refactor when STSchema available. // TODO: Move to tdataformat.h and refactor when STSchema available.
static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols, static FORCE_INLINE void convertToSKVRow(SMemRow dest, SMemRow src, SSchema *pSchema, int nCols, int nBoundCols, SParsedDataColInfo *spd) {
SParsedDataColInfo *spd) {
ASSERT(isDataRow(src)); ASSERT(isDataRow(src));
SDataRow dataRow = memRowDataBody(src); SDataRow dataRow = memRowDataBody(src);
@ -485,22 +487,7 @@ static int trimDataBlock(void* pDataBlock, STableDataBlocks* pTableDataBlock, SB
} }
static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) { static void extractTableNameList(SHashObj* pHashObj, bool freeBlockMap) {
// pInsertParam->numOfTables = (int32_t) taosHashGetSize(pInsertParam->pTableBlockHashList); // todo
// if (pInsertParam->pTableNameList == NULL) {
// pInsertParam->pTableNameList = malloc(pInsertParam->numOfTables * POINTER_BYTES);
// }
// STableDataBlocks **p1 = taosHashIterate(pInsertParam->pTableBlockHashList, NULL);
// int32_t i = 0;
// while(p1) {
// STableDataBlocks* pBlocks = *p1;
// pInsertParam->pTableNameList[i++] = tNameDup(&pBlocks->tableName);
// p1 = taosHashIterate(pInsertParam->pTableBlockHashList, p1);
// }
// if (freeBlockMap) {
// pInsertParam->pTableBlockHashList = tscDestroyBlockHashTable(pInsertParam->pTableBlockHashList, false);
// }
} }
int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) { int32_t mergeTableDataBlocks(SHashObj* pHashObj, int8_t schemaAttached, uint8_t payloadType, bool freeBlockMap) {

View File

@ -15,7 +15,6 @@
#include "insertParser.h" #include "insertParser.h"
// #include "astGenerator.h"
#include "dataBlockMgt.h" #include "dataBlockMgt.h"
#include "parserInt.h" #include "parserInt.h"
#include "parserUtil.h" #include "parserUtil.h"
@ -23,7 +22,6 @@
#include "tglobal.h" #include "tglobal.h"
#include "ttime.h" #include "ttime.h"
#include "ttoken.h" #include "ttoken.h"
// #include "function.h"
#include "ttypes.h" #include "ttypes.h"
#define NEXT_TOKEN(pSql, sToken) \ #define NEXT_TOKEN(pSql, sToken) \
@ -308,518 +306,249 @@ static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t parseOneColumn(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, bool primaryKey, int16_t timePrec, char* payload) { typedef int32_t (*FRowAppend)(const void *value, int32_t len, void *param);
int64_t iv;
int32_t ret; typedef struct SKvParam {
char *endptr = NULL; char buf[TSDB_MAX_TAGS_LEN];
SKVRowBuilder* builder;
SSchema* schema;
} SKvParam;
static FORCE_INLINE int32_t KvRowAppend(const void *value, int32_t len, void *param) {
SKvParam* pa = (SKvParam*)param;
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf);
} else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(pa->buf, output);
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf);
} else {
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, value);
}
return TSDB_CODE_SUCCESS;
}
typedef struct SMemParam {
SMemRow row;
SSchema* schema;
int32_t toffset;
uint8_t compareStat;
int32_t dataLen;
int32_t kvLen;
} SMemParam;
static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *param) {
SMemParam* pa = (SMemParam*)param;
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
char *rowEnd = memRowEnd(pa->row);
STR_WITH_SIZE_TO_VARSTR(rowEnd, value, len);
appendMemRowColValEx(pa->row, rowEnd, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat);
} else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
char * rowEnd = memRowEnd(pa->row);
if (!taosMbsToUcs4(value, len, (char *)varDataVal(rowEnd), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(rowEnd, output);
appendMemRowColValEx(pa->row, rowEnd, false, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat);
} else {
appendMemRowColValEx(pa->row, value, true, pa->schema->colId, pa->schema->type, pa->toffset, &pa->dataLen, &pa->kvLen, pa->compareStat);
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t checkAndTrimValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, char* tmpTokenBuf) {
int16_t type = pToken->type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(pToken->n == 0) || (type == TK_RP)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pToken->z);
}
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z); return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z);
} }
switch (pSchema->type) { // Remove quotation marks
case TSDB_DATA_TYPE_BOOL: { if (TK_STRING == type) {
if (isNullStr(pToken)) { if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
*((uint8_t *)payload) = TSDB_DATA_BOOL_NULL; return buildSyntaxErrMsg(&pCxt->msg, "too long string", pToken->z);
} else {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
*(uint8_t *)payload = TSDB_TRUE;
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
*(uint8_t *)payload = TSDB_FALSE;
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10);
*(uint8_t *)payload = (int8_t)((iv == 0) ? TSDB_FALSE : TSDB_TRUE);
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
*(uint8_t *)payload = (int8_t)((dv == 0) ? TSDB_FALSE : TSDB_TRUE);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
}
break;
} }
case TSDB_DATA_TYPE_TINYINT: // delete escape character: \\, \', \"
if (isNullStr(pToken)) { char delim = pToken->z[0];
*((uint8_t *)payload) = TSDB_DATA_TINYINT_NULL; int32_t cnt = 0;
} else { int32_t j = 0;
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true); for (uint32_t k = 1; k < pToken->n - 1; ++k) {
if (ret != TSDB_CODE_SUCCESS) { if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z); tmpTokenBuf[j] = pToken->z[k + 1];
} else if (!IS_VALID_TINYINT(iv)) { cnt++;
return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z); j++;
} k++;
*((uint8_t *)payload) = (uint8_t)iv; continue;
} }
break; tmpTokenBuf[j] = pToken->z[k];
case TSDB_DATA_TYPE_UTINYINT: j++;
if (isNullStr(pToken)) {
*((uint8_t *)payload) = TSDB_DATA_UTINYINT_NULL;
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
}
*((uint8_t *)payload) = (uint8_t)iv;
}
break;
case TSDB_DATA_TYPE_SMALLINT:
if (isNullStr(pToken)) {
*((int16_t *)payload) = TSDB_DATA_SMALLINT_NULL;
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
}
*((int16_t *)payload) = (int16_t)iv;
}
break;
case TSDB_DATA_TYPE_USMALLINT:
if (isNullStr(pToken)) {
*((uint16_t *)payload) = TSDB_DATA_USMALLINT_NULL;
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
}
*((uint16_t *)payload) = (uint16_t)iv;
}
break;
case TSDB_DATA_TYPE_INT:
if (isNullStr(pToken)) {
*((int32_t *)payload) = TSDB_DATA_INT_NULL;
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
}
*((int32_t *)payload) = (int32_t)iv;
}
break;
case TSDB_DATA_TYPE_UINT:
if (isNullStr(pToken)) {
*((uint32_t *)payload) = TSDB_DATA_UINT_NULL;
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
}
*((uint32_t *)payload) = (uint32_t)iv;
}
break;
case TSDB_DATA_TYPE_BIGINT:
if (isNullStr(pToken)) {
*((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z);
}
*((int64_t *)payload) = iv;
}
break;
case TSDB_DATA_TYPE_UBIGINT:
if (isNullStr(pToken)) {
*((uint64_t *)payload) = TSDB_DATA_UBIGINT_NULL;
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z);
}
*((uint64_t *)payload) = iv;
}
break;
case TSDB_DATA_TYPE_FLOAT:
if (isNullStr(pToken)) {
*((int32_t *)payload) = TSDB_DATA_FLOAT_NULL;
} else {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
SET_FLOAT_VAL(payload, dv);
}
break;
case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) {
*((int64_t *)payload) = TSDB_DATA_DOUBLE_NULL;
} else {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
*((double *)payload) = dv;
}
break;
case TSDB_DATA_TYPE_BINARY:
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost
if (pToken->type == TK_NULL) {
setVardataNull(payload, TSDB_DATA_TYPE_BINARY);
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { //todo refactor
return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z);
}
STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
}
break;
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
setVardataNull(payload, TSDB_DATA_TYPE_NCHAR);
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
if (!taosMbsToUcs4(pToken->z, pToken->n, varDataVal(payload), pSchema->bytes - VARSTR_HEADER_SIZE, &output)) {
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z);
}
varDataSetLen(payload, output);
}
break;
case TSDB_DATA_TYPE_TIMESTAMP: {
if (pToken->type == TK_NULL) {
if (primaryKey) {
*((int64_t *)payload) = 0;
} else {
*((int64_t *)payload) = TSDB_DATA_BIGINT_NULL;
}
} else {
int64_t temp;
if (parseTime(pCxt, pToken, timePrec, &temp) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
}
*((int64_t *)payload) = temp;
}
break;
} }
tmpTokenBuf[j] = 0;
pToken->z = tmpTokenBuf;
pToken->n -= 2 + cnt;
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static FORCE_INLINE int32_t parseOneColumnKV(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, bool primaryKey, int16_t timePrec, static FORCE_INLINE int32_t parseOneValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, FRowAppend func, void* param) {
SMemRow row, int32_t toffset, int16_t colId, int32_t* dataLen, int32_t* kvLen, uint8_t compareStat) {
int64_t iv; int64_t iv;
int32_t ret; int32_t ret;
char * endptr = NULL; char * endptr = NULL;
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) { CHECK_CODE(checkAndTrimValue(pCxt, pToken, pSchema, tmpTokenBuf));
return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z);
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
int64_t tmpVal = 0;
return func(&tmpVal, pSchema->bytes, param);
}
return func(getNullValue(pSchema->type), 0, param);
} }
switch (pSchema->type) { switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: { // bool case TSDB_DATA_TYPE_BOOL: {
if (isNullStr(pToken)) { if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, if (strncmp(pToken->z, "true", pToken->n) == 0) {
compareStat); return func(&TRUE_VALUE, pSchema->bytes, param);
} else { } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) { return func(&FALSE_VALUE, pSchema->bytes, param);
if (strncmp(pToken->z, "true", pToken->n) == 0) {
appendMemRowColValEx(row, &TRUE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
appendMemRowColValEx(row, &FALSE_VALUE, true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
iv = strtoll(pToken->z, NULL, 10);
appendMemRowColValEx(row, ((iv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
dataLen, kvLen, compareStat);
} else if (pToken->type == TK_FLOAT) {
double dv = strtod(pToken->z, NULL);
appendMemRowColValEx(row, ((dv == 0) ? &FALSE_VALUE : &TRUE_VALUE), true, colId, pSchema->type, toffset,
dataLen, kvLen, compareStat);
} else { } else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z); return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
} }
} else if (pToken->type == TK_INTEGER) {
return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else if (pToken->type == TK_FLOAT) {
return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
} }
break; break;
} }
case TSDB_DATA_TYPE_TINYINT: {
case TSDB_DATA_TYPE_TINYINT: if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
if (isNullStr(pToken)) { return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, } else if (!IS_VALID_TINYINT(iv)) {
compareStat); return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z);
} else {
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
uint8_t tmpVal = (uint8_t)iv;
break; return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UTINYINT: case TSDB_DATA_TYPE_UTINYINT:{
if (isNullStr(pToken)) { if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
compareStat); } else if (!IS_VALID_UTINYINT(iv)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
uint8_t tmpVal = (uint8_t)iv;
break; return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_SMALLINT: case TSDB_DATA_TYPE_SMALLINT: {
if (isNullStr(pToken)) { if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
compareStat); } else if (!IS_VALID_SMALLINT(iv)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
}
int16_t tmpVal = (int16_t)iv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
int16_t tmpVal = (int16_t)iv;
break; return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_USMALLINT: case TSDB_DATA_TYPE_USMALLINT: {
if (isNullStr(pToken)) { if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
compareStat); } else if (!IS_VALID_USMALLINT(iv)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
}
uint16_t tmpVal = (uint16_t)iv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
uint16_t tmpVal = (uint16_t)iv;
break; return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_INT: case TSDB_DATA_TYPE_INT: {
if (isNullStr(pToken)) { if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
compareStat); } else if (!IS_VALID_INT(iv)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
}
int32_t tmpVal = (int32_t)iv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
int32_t tmpVal = (int32_t)iv;
break; return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UINT: case TSDB_DATA_TYPE_UINT: {
if (isNullStr(pToken)) { if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
compareStat); } else if (!IS_VALID_UINT(iv)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
}
uint32_t tmpVal = (uint32_t)iv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
uint32_t tmpVal = (uint32_t)iv;
break; return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BIGINT: case TSDB_DATA_TYPE_BIGINT: {
if (isNullStr(pToken)) { if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
compareStat); } else if (!IS_VALID_BIGINT(iv)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z);
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, true);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z);
}
appendMemRowColValEx(row, &iv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
break; return func(&iv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UBIGINT: case TSDB_DATA_TYPE_UBIGINT: {
if (isNullStr(pToken)) { if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
compareStat); } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z);
ret = toInt64(pToken->z, pToken->type, pToken->n, &iv, false);
if (ret != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z);
}
uint64_t tmpVal = (uint64_t)iv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
break; uint64_t tmpVal = (uint64_t)iv;
return func(&tmpVal, pSchema->bytes, param);
case TSDB_DATA_TYPE_FLOAT: }
if (isNullStr(pToken)) { case TSDB_DATA_TYPE_FLOAT: {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, double dv;
compareStat); if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
} else { return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) ||
isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
float tmpVal = (float)dv;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
break; if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
case TSDB_DATA_TYPE_DOUBLE:
if (isNullStr(pToken)) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
} else {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
appendMemRowColValEx(row, &dv, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
break; float tmpVal = (float)dv;
return func(&tmpVal, pSchema->bytes, param);
case TSDB_DATA_TYPE_BINARY: }
// binary data cannot be null-terminated char string, otherwise the last char of the string is lost case TSDB_DATA_TYPE_DOUBLE: {
if (pToken->type == TK_NULL) { double dv;
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen, if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
compareStat); return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
} else { // too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z);
}
// STR_WITH_SIZE_TO_VARSTR(payload, pToken->z, pToken->n);
char *rowEnd = memRowEnd(row);
STR_WITH_SIZE_TO_VARSTR(rowEnd, pToken->z, pToken->n);
appendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
break; if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
case TSDB_DATA_TYPE_NCHAR:
if (pToken->type == TK_NULL) {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
} else {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
char * rowEnd = memRowEnd(row);
if (!taosMbsToUcs4(pToken->z, pToken->n, (char *)varDataVal(rowEnd), pSchema->bytes - VARSTR_HEADER_SIZE,
&output)) {
char buf[512] = {0};
snprintf(buf, tListLen(buf), "%s", strerror(errno));
return buildSyntaxErrMsg(&pCxt->msg, buf, pToken->z);
}
varDataSetLen(rowEnd, output);
appendMemRowColValEx(row, rowEnd, false, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
break; return func(&dv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BINARY: {
// too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { // todo refactor
return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z);
}
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_NCHAR: {
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_TIMESTAMP: { case TSDB_DATA_TYPE_TIMESTAMP: {
if (pToken->type == TK_NULL) { int64_t tmpVal;
if (primaryKey) { if (parseTime(pCxt, pToken, timePrec, &tmpVal) != TSDB_CODE_SUCCESS) {
// When building SKVRow primaryKey, we should not skip even with NULL value. return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
int64_t tmpVal = 0;
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} else {
appendMemRowColValEx(row, getNullValue(pSchema->type), true, colId, pSchema->type, toffset, dataLen, kvLen,
compareStat);
}
} else {
int64_t tmpVal;
if (parseTime(pCxt, pToken, timePrec, &tmpVal) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
}
appendMemRowColValEx(row, &tmpVal, true, colId, pSchema->type, toffset, dataLen, kvLen, compareStat);
} }
return func(&tmpVal, pSchema->bytes, param);
break;
} }
} }
return TSDB_CODE_SUCCESS; return TSDB_CODE_FAILED;
} }
// pSql -> tag1_name, ...) // pSql -> tag1_name, ...)
@ -892,35 +621,17 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pS
return TSDB_CODE_TSC_OUT_OF_MEMORY; return TSDB_CODE_TSC_OUT_OF_MEMORY;
} }
SKvParam param = {.builder = &kvRowBuilder};
SToken sToken; SToken sToken;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
for (int i = 0; i < pSpd->numOfBound; ++i) { for (int i = 0; i < pSpd->numOfBound; ++i) {
SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]];
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
SSchema* pSchema = &pTagsSchema[pSpd->boundedColumns[i]];
if (TK_ILLEGAL == sToken.type) { param.schema = pSchema;
tdDestroyKVRowBuilder(&kvRowBuilder); CHECK_CODE_2(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd));
destroyBoundColumnInfo(pSpd);
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
if (sToken.n == 0 || sToken.type == TK_RP) {
break;
}
// Remove quotation marks
if (TK_STRING == sToken.type) {
sToken.z++;
sToken.n -= 2;
}
char tagVal[TSDB_MAX_TAGS_LEN];
CHECK_CODE_2(parseOneColumn(pCxt, &sToken, pSchema, false, precision, tagVal), tdDestroyKVRowBuilder(&kvRowBuilder), destroyBoundColumnInfo(pSpd));
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
} }
destroyBoundColumnInfo(pSpd); destroyBoundColumnInfo(pSpd);
SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder); SKVRow row = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder); tdDestroyKVRowBuilder(&kvRowBuilder);
if (NULL == row) { if (NULL == row) {
@ -928,19 +639,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SParsedDataColInfo* pS
} }
tdSortKVRowByColIdx(row); tdSortKVRowByColIdx(row);
// pInsertParam->tagData.dataLen = kvRowLen(row); // todo construct payload
// if (pInsertParam->tagData.dataLen <= 0){
// return buildInvalidOperationMsg(msgBuf, "tag value expected");
// }
// char* pTag = realloc(pInsertParam->tagData.data, pInsertParam->tagData.dataLen);
// if (pTag == NULL) {
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
// kvRowCpy(pTag, row);
tfree(row); tfree(row);
// pInsertParam->tagData.data = pTag;
} }
// pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...) // pSql -> stb_name [(tag1_name, ...)] TAGS (tag1_value, ...)
@ -979,98 +680,26 @@ static int32_t parseUsingClause(SInsertParseContext* pCxt, SToken* pTbnameToken)
} }
static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) { static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks, int16_t timePrec, int32_t* len, char* tmpTokenBuf) {
int32_t index = 0; SParsedDataColInfo* spd = &pDataBlocks->boundColumnInfo;
SToken sToken = {0}; SMemRowBuilder* pBuilder = &pDataBlocks->rowBuilder;
char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header char *row = pDataBlocks->pData + pDataBlocks->size; // skip the SSubmitBlk header
SParsedDataColInfo *spd = &pDataBlocks->boundColumnInfo;
STableMeta * pTableMeta = pDataBlocks->pTableMeta;
SSchema * schema = getTableColumnSchema(pTableMeta);
SMemRowBuilder * pBuilder = &pDataBlocks->rowBuilder;
int32_t dataLen = spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE;
int32_t kvLen = pBuilder->kvRowInitLen;
bool isParseBindParam = false;
initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound); initSMemRow(row, pBuilder->memRowType, pDataBlocks, spd->numOfBound);
bool isParseBindParam = false;
SSchema* schema = getTableColumnSchema(pDataBlocks->pTableMeta);
SMemParam param = {.row = row};
SToken sToken = {0};
// 1. set the parsed value from sql string // 1. set the parsed value from sql string
for (int i = 0; i < spd->numOfBound; ++i) { for (int i = 0; i < spd->numOfBound; ++i) {
// the start position in data block buffer of current value in sql
int32_t colIndex = spd->boundedColumns[i];
char *start = row + spd->cols[colIndex].offset;
SSchema *pSchema = &schema[colIndex]; // get colId here
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
// todo bind param
SSchema *pSchema = &schema[spd->boundedColumns[i]];
param.schema = pSchema;
param.compareStat = pBuilder->compareStat;
getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &param.toffset);
CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param));
// if (sToken.type == TK_QUESTION) { if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
// if (!isParseBindParam) {
// isParseBindParam = true;
// }
// if (pInsertParam->insertType != TSDB_QUERY_TYPE_STMT_INSERT) {
// return buildSyntaxErrMsg(pInsertParam->msg, "? only allowed in binding insertion", *str);
// }
// uint32_t offset = (uint32_t)(start - pDataBlocks->pData);
// if (tscAddParamToDataBlock(pDataBlocks, pSchema->type, (uint8_t)timePrec, pSchema->bytes, offset) != NULL) {
// continue;
// }
// strcpy(pInsertParam->msg, "client out of memory");
// return TSDB_CODE_TSC_OUT_OF_MEMORY;
// }
int16_t type = sToken.type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(sToken.n == 0) || (type == TK_RP)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", sToken.z);
}
// Remove quotation marks
if (TK_STRING == sToken.type) {
// delete escape character: \\, \', \"
char delim = sToken.z[0];
int32_t cnt = 0;
int32_t j = 0;
if (sToken.n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(&pCxt->msg, "too long string", sToken.z);
}
for (uint32_t k = 1; k < sToken.n - 1; ++k) {
if (sToken.z[k] == '\\' || (sToken.z[k] == delim && sToken.z[k + 1] == delim)) {
tmpTokenBuf[j] = sToken.z[k + 1];
cnt++;
j++;
k++;
continue;
}
tmpTokenBuf[j] = sToken.z[k];
j++;
}
tmpTokenBuf[j] = 0;
sToken.z = tmpTokenBuf;
sToken.n -= 2 + cnt;
}
bool isPrimaryKey = (colIndex == PRIMARYKEY_TIMESTAMP_COL_ID);
int32_t toffset = -1;
int16_t colId = -1;
getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &toffset, &colId);
int32_t ret = parseOneColumnKV(pCxt, &sToken, pSchema, isPrimaryKey, timePrec, row, toffset,
colId, &dataLen, &kvLen, pBuilder->compareStat);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
if (isPrimaryKey) {
TSKEY tsKey = memRowKey(row); TSKEY tsKey = memRowKey(row);
if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) { if (checkTimestamp(pDataBlocks, (const char *)&tsKey) != TSDB_CODE_SUCCESS) {
buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z); buildSyntaxErrMsg(&pCxt->msg, "client time/server time can not be mixed up", sToken.z);
@ -1082,7 +711,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
if (!isParseBindParam) { if (!isParseBindParam) {
// 2. check and set convert flag // 2. check and set convert flag
if (pBuilder->compareStat == ROW_COMPARE_NEED) { if (pBuilder->compareStat == ROW_COMPARE_NEED) {
checkAndConvertMemRow(row, dataLen, kvLen); convertMemRow(row, spd->allNullLen + TD_MEM_ROW_DATA_HEAD_SIZE, pBuilder->kvRowInitLen);
} }
// 3. set the null value for the columns that do not assign values // 3. set the null value for the columns that do not assign values
@ -1097,17 +726,17 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
} }
*len = getExtendedRowSize(pDataBlocks); *len = getExtendedRowSize(pDataBlocks);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// pSql -> (field1_value, ...) [(field1_value2, ...) ...] // pSql -> (field1_value, ...) [(field1_value2, ...) ...]
static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows, char* tmpTokenBuf) { static int32_t parseValues(SInsertParseContext* pCxt, STableDataBlocks* pDataBlock, int maxRows, int32_t* numOfRows) {
STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta); STableComInfo tinfo = getTableInfo(pDataBlock->pTableMeta);
int32_t extendedRowSize = getExtendedRowSize(pDataBlock); int32_t extendedRowSize = getExtendedRowSize(pDataBlock);
CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, pDataBlock->boundColumnInfo.allNullLen)); CHECK_CODE(initMemRowBuilder(&pDataBlock->rowBuilder, 0, tinfo.numOfColumns, pDataBlock->boundColumnInfo.numOfBound, pDataBlock->boundColumnInfo.allNullLen));
(*numOfRows) = 0; (*numOfRows) = 0;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \"
SToken sToken; SToken sToken;
while (1) { while (1) {
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
@ -1145,8 +774,7 @@ static int32_t parseValuesClause(SInsertParseContext* pCxt, STableDataBlocks* da
CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows)); CHECK_CODE(allocateMemIfNeed(dataBuf, getExtendedRowSize(dataBuf), &maxNumOfRows));
int32_t numOfRows = 0; int32_t numOfRows = 0;
char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // used for deleting Escape character: \\, \', \" CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows));
CHECK_CODE(parseValues(pCxt, dataBuf, maxNumOfRows, &numOfRows, tmpTokenBuf));
for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) { for (uint32_t i = 0; i < dataBuf->numOfParams; ++i) {
SParamInfo *param = dataBuf->params + i; SParamInfo *param = dataBuf->params + i;
@ -1186,7 +814,6 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
} }
SToken tbnameToken = sToken; SToken tbnameToken = sToken;
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
// USING cluase // USING cluase
@ -1203,12 +830,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
if (TK_LP == sToken.type) { if (TK_LP == sToken.type) {
// pSql -> field1_name, ...) // pSql -> field1_name, ...)
NEXT_TOKEN(pCxt->pSql, sToken); CHECK_CODE_1(parseBoundColumns(pCxt, &dataBuf->boundColumnInfo, getTableColumnSchema(pCxt->pTableMeta)), destroyBoundColumnInfo(&dataBuf->boundColumnInfo));
// todo col_list
NEXT_TOKEN(pCxt->pSql, sToken);
if (TK_RP != sToken.type) {
return buildSyntaxErrMsg(&pCxt->msg, ") is expected", sToken.z);
}
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
} }
@ -1222,11 +844,9 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
if (TK_FILE == sToken.type) { if (TK_FILE == sToken.type) {
// pSql -> csv_file_path // pSql -> csv_file_path
NEXT_TOKEN(pCxt->pSql, sToken); NEXT_TOKEN(pCxt->pSql, sToken);
if (0 == sToken.n || (TK_STRING != sToken.type && TK_ID != sToken.type)) { if (0 == sToken.n || (TK_STRING != sToken.type && TK_ID != sToken.type)) {
return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z); return buildSyntaxErrMsg(&pCxt->msg, "file path is required following keyword FILE", sToken.z);
} }
// todo // todo
continue; continue;
} }