Merge pull request #9471 from taosdata/feature/3.0_liaohj

[td-11818] refactor create child table procedure.
This commit is contained in:
Haojun Liao 2021-12-29 11:48:20 +08:00 committed by GitHub
commit ea3f05f667
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 540 additions and 585 deletions

View File

@ -27,7 +27,7 @@ typedef struct SParseContext {
int8_t schemaAttached; // denote if submit block is built with table schema or not
const char *pSql; // sql string
size_t sqlLen; // length of the sql string
char *pMsg; // extended error message if exists to help avoid the problem in sql statement.
char *pMsg; // extended error message if exists to help identifying the problem in sql statement.
int32_t msgLen; // max length of the msg
} SParseContext;

View File

@ -181,43 +181,13 @@ int32_t execDdlQuery(SRequestObj* pRequest, SQueryNode* pQuery) {
pRequest->body.requestMsg = (SDataBuf){.pData = pDcl->pMsg, .len = pDcl->msgLen};
STscObj* pTscObj = pRequest->pTscObj;
SMsgSendInfo* pSendMsg = buildSendMsgInfoImpl(pRequest);
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
if (pDcl->msgType == TDMT_VND_CREATE_TABLE) {
// struct SCatalog* pCatalog = NULL;
//
// char buf[18] = {0};
// sprintf(buf, "%" PRId64, pRequest->pTscObj->pAppInfo->clusterId);
// int32_t code = catalogGetHandle(buf, &pCatalog);
// if (code != TSDB_CODE_SUCCESS) {
// return code;
// }
//
// SCreateTableMsg* pMsg = pSendMsg->msgInfo.pData;
//
// SName t = {0};
// tNameFromString(&t, pMsg->name, T_NAME_ACCT|T_NAME_DB|T_NAME_TABLE);
//
// char db[TSDB_DB_NAME_LEN + TSDB_NAME_DELIMITER_LEN + TSDB_ACCT_ID_LEN] = {0};
// tNameGetFullDbName(&t, db);
//
// SVgroupInfo info = {0};
// catalogGetTableHashVgroup(pCatalog, pRequest->pTscObj->pTransporter, pEpSet, db, tNameGetTableName(&t), &info);
//
int64_t transporterId = 0;
// SEpSet ep = {0};
// ep.inUse = info.inUse;
// ep.numOfEps = info.numOfEps;
// for(int32_t i = 0; i < ep.numOfEps; ++i) {
// ep.port[i] = info.epAddr[i].port;
// tstrncpy(ep.fqdn[i], info.epAddr[i].fqdn, tListLen(ep.fqdn[i]));
// }
if (pDcl->msgType == TDMT_VND_CREATE_TABLE) {
asyncSendMsgToServer(pTscObj->pTransporter, &pDcl->epSet, &transporterId, pSendMsg);
} else {
int64_t transporterId = 0;
SEpSet* pEpSet = &pTscObj->pAppInfo->mgmtEp.epSet;
asyncSendMsgToServer(pTscObj->pTransporter, pEpSet, &transporterId, pSendMsg);
}

View File

@ -433,7 +433,6 @@ TEST(testCase, create_topic_Test) {
taos_close(pConn);
}
//TEST(testCase, show_table_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);

View File

@ -70,6 +70,17 @@ bool isDclSqlStatement(SSqlInfo* pSqlInfo);
bool isDdlSqlStatement(SSqlInfo* pSqlInfo);
bool isDqlSqlStatement(SSqlInfo* pSqlInfo);
typedef struct SKvParam {
SKVRowBuilder *builder;
SSchema *schema;
char buf[TSDB_MAX_TAGS_LEN];
} SKvParam;
int32_t KvRowAppend(const void *value, int32_t len, void *param);
typedef int32_t (*_row_append_fn_t)(const void *value, int32_t len, void *param);
int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf);
#ifdef __cplusplus
}
#endif

View File

@ -429,7 +429,6 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
SSchema *pSchema = &pTagSchema[i];
SToken* pItem = taosArrayGet(pValList, i);
char tagVal[TSDB_MAX_TAGS_LEN];
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
if (pItem->n > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
@ -446,26 +445,16 @@ int32_t doCheckForCreateCTable(SSqlInfo* pInfo, SParseBasicCtx *pCtx, SMsgBuf* p
// }
}
char* endPtr = NULL;
int64_t v = strtoll(pItem->z, &endPtr, 10);
*(int32_t*) tagVal = v;
// code = taosVariantDump(&(pItem->pVar), tagVal, pSchema->type, true);
char tmpTokenBuf[TSDB_MAX_TAGS_LEN] = {0};
SKvParam param = {.builder = &kvRowBuilder, .schema = pSchema};
// check again after the convert since it may be converted from binary to nchar.
if (pSchema->type == TSDB_DATA_TYPE_BINARY || pSchema->type == TSDB_DATA_TYPE_NCHAR) {
int16_t len = varDataTLen(tagVal);
if (len > pSchema->bytes) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return buildInvalidOperationMsg(pMsgBuf, msg3);
}
}
char* endPtr = NULL;
code = parseValueToken(&endPtr, pItem, pSchema, tinfo.precision, tmpTokenBuf, KvRowAppend, &param, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
tdDestroyKVRowBuilder(&kvRowBuilder);
return buildInvalidOperationMsg(pMsgBuf, msg4);
}
tdAddColToKVRow(&kvRowBuilder, pSchema->colId, pSchema->type, tagVal);
}
}

View File

@ -51,7 +51,7 @@ enum {
typedef struct SInsertParseContext {
SParseContext* pComCxt; // input
const char* pSql; // input
char *pSql; // input
SMsgBuf msg; // input
STableMeta* pTableMeta; // each table
SParsedDataColInfo tags; // each table
@ -64,22 +64,6 @@ typedef struct SInsertParseContext {
SInsertStmtInfo* pOutput;
} SInsertParseContext;
typedef int32_t (*FRowAppend)(const void *value, int32_t len, void *param);
typedef struct SKvParam {
char buf[TSDB_MAX_TAGS_LEN];
SKVRowBuilder* builder;
SSchema* schema;
} SKvParam;
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
static bool isNullStr(SToken *pToken) {
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) {
errno = 0;
*value = strtold(pToken->z, endPtr);
@ -263,23 +247,21 @@ static int32_t checkTimestamp(STableDataBlocks *pDataBlocks, const char *start)
return TSDB_CODE_SUCCESS;
}
static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec, int64_t *time) {
static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time, SMsgBuf* pMsgBuf) {
int32_t index = 0;
SToken sToken;
int64_t interval;
int64_t useconds = 0;
const char* pTokenEnd = pCxt->pSql;
int64_t ts = 0;
char* pTokenEnd = *end;
if (pToken->type == TK_NOW) {
useconds = taosGetTimestamp(timePrec);
} else if (strncmp(pToken->z, "0", 1) == 0 && pToken->n == 1) {
// do nothing
ts = taosGetTimestamp(timePrec);
} else if (pToken->type == TK_INTEGER) {
useconds = taosStr2int64(pToken->z);
} else {
// strptime("2001-11-12 18:31:01", "%Y-%m-%d %H:%M:%S", &tm);
bool isSigned = false;
toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
} else { // parse the RFC-3339/ISO-8601 timestamp format string
if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp format", pToken->z);
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
}
return TSDB_CODE_SUCCESS;
@ -288,8 +270,8 @@ static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec
for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
if (pToken->z[k] == ',') {
pCxt->pSql = pTokenEnd;
*time = useconds;
*end = pTokenEnd;
*time = ts;
return 0;
}
@ -311,7 +293,7 @@ static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec
pTokenEnd += index;
if (valueToken.n < 2) {
return buildSyntaxErrMsg(&pCxt->msg, "value expected in timestamp", sToken.z);
return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
}
char unit = 0;
@ -320,34 +302,15 @@ static int parseTime(SInsertParseContext* pCxt, SToken *pToken, int16_t timePrec
}
if (sToken.type == TK_PLUS) {
useconds += interval;
ts += interval;
} else {
useconds = useconds - interval;
ts = ts - interval;
}
pCxt->pSql = pTokenEnd;
*end = pTokenEnd;
}
*time = useconds;
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t KvRowAppend(const void *value, int32_t len, void *param) {
SKvParam* pa = (SKvParam*)param;
if (TSDB_DATA_TYPE_BINARY == pa->schema->type) {
STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf);
} else if (TSDB_DATA_TYPE_NCHAR == pa->schema->type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(pa->buf, output);
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, pa->buf);
} else {
tdAddColToKVRow(pa->builder, pa->schema->colId, pa->schema->type, value);
}
*time = ts;
return TSDB_CODE_SUCCESS;
}
@ -381,193 +344,206 @@ static FORCE_INLINE int32_t MemRowAppend(const void *value, int32_t len, void *p
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t checkAndTrimValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, char* tmpTokenBuf) {
int16_t type = pToken->type;
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(pToken->n == 0) || (type == TK_RP)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid data or symbol", pToken->z);
}
//static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
// if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
// type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
// (pToken->n == 0) || (type == TK_RP)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
// }
//
// if (IS_NUMERIC_TYPE(type) && pToken->n == 0) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z);
// }
//
// // Remove quotation marks
// if (TK_STRING == type) {
// if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
// return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
// }
//
// // delete escape character: \\, \', \"
// char delim = pToken->z[0];
// int32_t cnt = 0;
// int32_t j = 0;
// for (uint32_t k = 1; k < pToken->n - 1; ++k) {
// if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
// tmpTokenBuf[j] = pToken->z[k + 1];
// cnt++;
// j++;
// k++;
// continue;
// }
// tmpTokenBuf[j] = pToken->z[k];
// j++;
// }
//
// tmpTokenBuf[j] = 0;
// pToken->z = tmpTokenBuf;
// pToken->n -= 2 + cnt;
// }
//
// return TSDB_CODE_SUCCESS;
//}
if (IS_NUMERIC_TYPE(pSchema->type) && pToken->n == 0) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid numeric data", pToken->z);
}
// Remove quotation marks
if (TK_STRING == type) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(&pCxt->msg, "too long string", pToken->z);
}
// delete escape character: \\, \', \"
char delim = pToken->z[0];
int32_t cnt = 0;
int32_t j = 0;
for (uint32_t k = 1; k < pToken->n - 1; ++k) {
if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
tmpTokenBuf[j] = pToken->z[k + 1];
cnt++;
j++;
k++;
continue;
}
tmpTokenBuf[j] = pToken->z[k];
j++;
}
tmpTokenBuf[j] = 0;
pToken->z = tmpTokenBuf;
pToken->n -= 2 + cnt;
}
return TSDB_CODE_SUCCESS;
}
static FORCE_INLINE int32_t parseOneValue(SInsertParseContext* pCxt, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, FRowAppend func, void* param) {
int64_t iv;
int32_t ret;
char * endptr = NULL;
CHECK_CODE(checkAndTrimValue(pCxt, pToken, pSchema, tmpTokenBuf));
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
int64_t tmpVal = 0;
return func(&tmpVal, pSchema->bytes, param);
}
return func(getNullValue(pSchema->type), 0, param);
}
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
return func(&TRUE_VALUE, pSchema->bytes, param);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
return func(&FALSE_VALUE, pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else if (pToken->type == TK_FLOAT) {
return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bool data", pToken->z);
}
break;
}
case TSDB_DATA_TYPE_TINYINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UTINYINT:{
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_SMALLINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "smallint data overflow", pToken->z);
}
int16_t tmpVal = (int16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_USMALLINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned smallint data overflow", pToken->z);
}
uint16_t tmpVal = (uint16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_INT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "int data overflow", pToken->z);
}
int32_t tmpVal = (int32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned int data overflow", pToken->z);
}
uint32_t tmpVal = (uint32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BIGINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, true)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "bigint data overflow", pToken->z);
}
return func(&iv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UBIGINT: {
if (TSDB_CODE_SUCCESS != toInt64(pToken->z, pToken->type, pToken->n, &iv, false)) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return buildSyntaxErrMsg(&pCxt->msg, "unsigned bigint data overflow", pToken->z);
}
uint64_t tmpVal = (uint64_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_FLOAT: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal float data", pToken->z);
}
float tmpVal = (float)dv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_DOUBLE: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(&pCxt->msg, "illegal double data", pToken->z);
}
return func(&dv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BINARY: {
// too long values will return invalid sql, not be truncated automatically
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
return buildSyntaxErrMsg(&pCxt->msg, "string data overflow", pToken->z);
}
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_NCHAR: {
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t tmpVal;
if (parseTime(pCxt, pToken, timePrec, &tmpVal) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(&pCxt->msg, "invalid timestamp", pToken->z);
}
return func(&tmpVal, pSchema->bytes, param);
}
}
return TSDB_CODE_FAILED;
}
//static FORCE_INLINE int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
// int64_t iv;
// char *endptr = NULL;
// bool isSigned = false;
//
// CHECK_CODE(checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf));
//
// if (isNullStr(pToken)) {
// if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
// int64_t tmpVal = 0;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// return func(getNullValue(pSchema->type), 0, param);
// }
//
// switch (pSchema->type) {
// case TSDB_DATA_TYPE_BOOL: {
// if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
// if (strncmp(pToken->z, "true", pToken->n) == 0) {
// return func(&TRUE_VALUE, pSchema->bytes, param);
// } else if (strncmp(pToken->z, "false", pToken->n) == 0) {
// return func(&FALSE_VALUE, pSchema->bytes, param);
// } else {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
// }
// } else if (pToken->type == TK_INTEGER) {
// return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
// } else if (pToken->type == TK_FLOAT) {
// return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
// } else {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
// }
// }
//
// case TSDB_DATA_TYPE_TINYINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
// } else if (!IS_VALID_TINYINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
// }
//
// uint8_t tmpVal = (uint8_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UTINYINT:{
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
// } else if (!IS_VALID_UTINYINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
// }
// uint8_t tmpVal = (uint8_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_SMALLINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
// } else if (!IS_VALID_SMALLINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
// }
// int16_t tmpVal = (int16_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_USMALLINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
// } else if (!IS_VALID_USMALLINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
// }
// uint16_t tmpVal = (uint16_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_INT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
// } else if (!IS_VALID_INT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
// }
// int32_t tmpVal = (int32_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
// } else if (!IS_VALID_UINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
// }
// uint32_t tmpVal = (uint32_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_BIGINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
// } else if (!IS_VALID_BIGINT(iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
// }
// return func(&iv, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_UBIGINT: {
// if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
// } else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
// return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
// }
// uint64_t tmpVal = (uint64_t)iv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_FLOAT: {
// double dv;
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
// }
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
// }
// float tmpVal = (float)dv;
// return func(&tmpVal, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_DOUBLE: {
// double dv;
// if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
// }
// if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
// return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
// }
// return func(&dv, pSchema->bytes, param);
// }
//
// case TSDB_DATA_TYPE_BINARY: {
// // too long values will return invalid sql, not be truncated automatically
// if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
// return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
// }
// return func(pToken->z, pToken->n, param);
// }
// case TSDB_DATA_TYPE_NCHAR: {
// return func(pToken->z, pToken->n, param);
// }
// case TSDB_DATA_TYPE_TIMESTAMP: {
// int64_t tmpVal;
// if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
// return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
// }
// return func(&tmpVal, pSchema->bytes, param);
// }
// }
//
// return TSDB_CODE_FAILED;
//}
// pSql -> tag1_name, ...)
static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* pColList, SSchema* pSchema) {
@ -644,7 +620,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pTagsSchema,
NEXT_TOKEN(pCxt->pSql, sToken);
SSchema* pSchema = &pTagsSchema[pCxt->tags.boundedColumns[i]];
param.schema = pSchema;
CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param));
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, precision, tmpTokenBuf, KvRowAppend, &param, &pCxt->msg));
}
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
@ -709,7 +685,7 @@ static int parseOneRow(SInsertParseContext* pCxt, STableDataBlocks* pDataBlocks,
param.schema = pSchema;
param.compareStat = pBuilder->compareStat;
getMemRowAppendInfo(schema, pBuilder->memRowType, spd, i, &param.toffset);
CHECK_CODE(parseOneValue(pCxt, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param));
CHECK_CODE(parseValueToken(&pCxt->pSql, &sToken, pSchema, timePrec, tmpTokenBuf, MemRowAppend, &param, &pCxt->msg));
if (PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
TSKEY tsKey = memRowKey(row);
@ -894,7 +870,7 @@ static int32_t parseInsertBody(SInsertParseContext* pCxt) {
int32_t parseInsertSql(SParseContext* pContext, SInsertStmtInfo** pInfo) {
SInsertParseContext context = {
.pComCxt = pContext,
.pSql = pContext->pSql,
.pSql = (char*) pContext->pSql,
.msg = {.buf = pContext->pMsg, .len = pContext->msgLen},
.pTableMeta = NULL,
.pVgroupsHashObj = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false),

View File

@ -13,19 +13,20 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tmsg.h"
#include "parser.h"
#include "taoserror.h"
#include "tutil.h"
#include "ttypes.h"
#include "thash.h"
#include "tbuffer.h"
#include "parserInt.h"
#include "parserUtil.h"
#include "tmsgtype.h"
#include "queryInfoUtil.h"
#include <tglobal.h>
#include <ttime.h>
#include "function.h"
#include "parser.h"
#include "parserInt.h"
#include "queryInfoUtil.h"
#include "taoserror.h"
#include "tbuffer.h"
#include "thash.h"
#include "tmsg.h"
#include "tmsgtype.h"
#include "ttypes.h"
#include "tutil.h"
typedef struct STableFilterCond {
uint64_t uid;
@ -1627,313 +1628,322 @@ bool isDqlSqlStatement(SSqlInfo* pSqlInfo) {
return pSqlInfo->type == TSDB_SQL_SELECT;
}
#if 0
int32_t tscCreateQueryFromQueryInfo(SQueryStmtInfo* pQueryInfo, SQueryAttr* pQueryAttr, void* addr) {
memset(pQueryAttr, 0, sizeof(SQueryAttr));
static uint8_t TRUE_VALUE = (uint8_t)TSDB_TRUE;
static uint8_t FALSE_VALUE = (uint8_t)TSDB_FALSE;
int16_t numOfCols = (int16_t) taosArrayGetSize(pQueryInfo->colList);
int16_t numOfOutput = (int16_t) getNumOfExprs(pQueryInfo);
static FORCE_INLINE int32_t toDouble(SToken *pToken, double *value, char **endPtr) {
errno = 0;
*value = strtold(pToken->z, endPtr);
pQueryAttr->topBotQuery = tscIsTopBotQuery(pQueryInfo);
pQueryAttr->hasTagResults = hasTagValOutput(pQueryInfo);
pQueryAttr->stabledev = isStabledev(pQueryInfo);
pQueryAttr->tsCompQuery = isTsCompQuery(pQueryInfo);
pQueryAttr->diffQuery = tscIsDiffDerivQuery(pQueryInfo);
pQueryAttr->simpleAgg = isSimpleAggregateRv(pQueryInfo);
pQueryAttr->needReverseScan = tscNeedReverseScan(pQueryInfo);
pQueryAttr->stableQuery = QUERY_IS_STABLE_QUERY(pQueryInfo->type);
pQueryAttr->groupbyColumn = (!pQueryInfo->stateWindow) && tscGroupbyColumn(pQueryInfo);
pQueryAttr->queryBlockDist = isBlockDistQuery(pQueryInfo);
pQueryAttr->pointInterpQuery = tscIsPointInterpQuery(pQueryInfo);
pQueryAttr->timeWindowInterpo = timeWindowInterpoRequired(pQueryInfo);
pQueryAttr->distinct = pQueryInfo->distinct;
pQueryAttr->sw = pQueryInfo->sessionWindow;
pQueryAttr->stateWindow = pQueryInfo->stateWindow;
pQueryAttr->multigroupResult = pQueryInfo->multigroupResult;
pQueryAttr->numOfCols = numOfCols;
pQueryAttr->numOfOutput = numOfOutput;
pQueryAttr->limit = pQueryInfo->limit;
pQueryAttr->slimit = pQueryInfo->slimit;
pQueryAttr->order = pQueryInfo->order;
pQueryAttr->fillType = pQueryInfo->fillType;
pQueryAttr->havingNum = pQueryInfo->havingFieldNum;
pQueryAttr->pUdfInfo = pQueryInfo->pUdfInfo;
if (pQueryInfo->order.order == TSDB_ORDER_ASC) { // TODO refactor
pQueryAttr->window = pQueryInfo->window;
} else {
pQueryAttr->window.skey = pQueryInfo->window.ekey;
pQueryAttr->window.ekey = pQueryInfo->window.skey;
// not a valid integer number, return error
if ((*endPtr - pToken->z) != pToken->n) {
return TK_ILLEGAL;
}
memcpy(&pQueryAttr->interval, &pQueryInfo->interval, sizeof(pQueryAttr->interval));
return pToken->type;
}
STableMetaInfo* pTableMetaInfo = pQueryInfo->pTableMetaInfo[0];
static bool isNullStr(SToken *pToken) {
return (pToken->type == TK_NULL) || ((pToken->type == TK_STRING) && (pToken->n != 0) &&
(strncasecmp(TSDB_DATA_NULL_STR_L, pToken->z, pToken->n) == 0));
}
if (pQueryInfo->groupbyExpr.numOfGroupCols > 0) {
pQueryAttr->pGroupbyExpr = calloc(1, sizeof(SGroupbyExpr));
*(pQueryAttr->pGroupbyExpr) = pQueryInfo->groupbyExpr;
pQueryAttr->pGroupbyExpr->columnInfo = taosArrayDup(pQueryInfo->groupbyExpr.columnInfo);
} else {
assert(pQueryInfo->groupbyExpr.columnInfo == NULL);
static FORCE_INLINE int32_t checkAndTrimValue(SToken* pToken, uint32_t type, char* tmpTokenBuf, SMsgBuf* pMsgBuf) {
if ((type != TK_NOW && type != TK_INTEGER && type != TK_STRING && type != TK_FLOAT && type != TK_BOOL &&
type != TK_NULL && type != TK_HEX && type != TK_OCT && type != TK_BIN) ||
(pToken->n == 0) || (type == TK_RP)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid data or symbol", pToken->z);
}
pQueryAttr->pExpr1 = calloc(pQueryAttr->numOfOutput, sizeof(SExprInfo));
for(int32_t i = 0; i < pQueryAttr->numOfOutput; ++i) {
SExprInfo* pExpr = getExprInfo(pQueryInfo, i);
ExprInfoCopy(&pQueryAttr->pExpr1[i], pExpr);
if (pQueryAttr->pExpr1[i].base.functionId == FUNCTION_ARITHM) {
for (int32_t j = 0; j < pQueryAttr->pExpr1[i].base.numOfParams; ++j) {
buildArithmeticExprFromMsg(&pQueryAttr->pExpr1[i], NULL);
}
}
if (IS_NUMERIC_TYPE(type) && pToken->n == 0) {
return buildSyntaxErrMsg(pMsgBuf, "invalid numeric data", pToken->z);
}
pQueryAttr->tableCols = calloc(numOfCols, sizeof(SColumnInfo));
for(int32_t i = 0; i < numOfCols; ++i) {
SColumn* pCol = taosArrayGetP(pQueryInfo->colList, i);
if (!isValidDataType(pCol->info.type) || pCol->info.type == TSDB_DATA_TYPE_NULL) {
assert(0);
// Remove quotation marks
if (TK_STRING == type) {
if (pToken->n >= TSDB_MAX_BYTES_PER_ROW) {
return buildSyntaxErrMsg(pMsgBuf, "too long string", pToken->z);
}
pQueryAttr->tableCols[i] = pCol->info;
pQueryAttr->tableCols[i].flist.filterInfo = tFilterInfoDup(pCol->info.flist.filterInfo, pQueryAttr->tableCols[i].flist.numOfFilters);
// delete escape character: \\, \', \"
char delim = pToken->z[0];
int32_t cnt = 0;
int32_t j = 0;
for (uint32_t k = 1; k < pToken->n - 1; ++k) {
if (pToken->z[k] == '\\' || (pToken->z[k] == delim && pToken->z[k + 1] == delim)) {
tmpTokenBuf[j] = pToken->z[k + 1];
cnt++;
j++;
k++;
continue;
}
tmpTokenBuf[j] = pToken->z[k];
j++;
}
// global aggregate query
if (pQueryAttr->stableQuery && (pQueryAttr->simpleAgg || pQueryAttr->interval.interval > 0) && tscIsTwoStageSTableQuery(pQueryInfo, 0)) {
createGlobalAggregateExpr(pQueryAttr, pQueryInfo);
}
// for simple table, not for super table
if (pQueryInfo->arithmeticOnAgg) {
pQueryAttr->numOfExpr2 = (int32_t) taosArrayGetSize(pQueryInfo->exprList1);
pQueryAttr->pExpr2 = calloc(pQueryAttr->numOfExpr2, sizeof(SExprInfo));
for(int32_t i = 0; i < pQueryAttr->numOfExpr2; ++i) {
SExprInfo* p = taosArrayGetP(pQueryInfo->exprList1, i);
ExprInfoCopy(&pQueryAttr->pExpr2[i], p);
}
}
// tag column info
int32_t code = createTagColumnInfo(pQueryAttr, pQueryInfo, pTableMetaInfo);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (pQueryAttr->fillType != TSDB_FILL_NONE) {
pQueryAttr->fillVal = calloc(pQueryAttr->numOfOutput, sizeof(int64_t));
memcpy(pQueryAttr->fillVal, pQueryInfo->fillVal, pQueryInfo->numOfFillVal * sizeof(int64_t));
}
pQueryAttr->srcRowSize = 0;
pQueryAttr->maxTableColumnWidth = 0;
for (int16_t i = 0; i < numOfCols; ++i) {
pQueryAttr->srcRowSize += pQueryAttr->tableCols[i].bytes;
if (pQueryAttr->maxTableColumnWidth < pQueryAttr->tableCols[i].bytes) {
pQueryAttr->maxTableColumnWidth = pQueryAttr->tableCols[i].bytes;
}
}
pQueryAttr->interBufSize = getOutputInterResultBufSize(pQueryAttr);
if (pQueryAttr->numOfCols <= 0 && !tscQueryTags(pQueryInfo) && !pQueryAttr->queryBlockDist) {
tscError("%p illegal value of numOfCols in query msg: %" PRIu64 ", table cols:%d", addr,
(uint64_t)pQueryAttr->numOfCols, numOfCols);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (pQueryAttr->interval.interval < 0) {
tscError("%p illegal value of aggregation time interval in query msg: %" PRId64, addr,
(int64_t)pQueryInfo->interval.interval);
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (pQueryAttr->pGroupbyExpr != NULL && pQueryAttr->pGroupbyExpr->numOfGroupCols < 0) {
tscError("%p illegal value of numOfGroupCols in query msg: %d", addr, pQueryInfo->groupbyExpr.numOfGroupCols);
return TSDB_CODE_TSC_INVALID_OPERATION;
tmpTokenBuf[j] = 0;
pToken->z = tmpTokenBuf;
pToken->n -= 2 + cnt;
}
return TSDB_CODE_SUCCESS;
}
static int32_t doAddTableName(char* nextStr, char** str, SArray* pNameArray, SSqlObj* pSql) {
int32_t code = TSDB_CODE_SUCCESS;
SSqlCmd* pCmd = &pSql->cmd;
static int parseTime(char **end, SToken *pToken, int16_t timePrec, int64_t *time, SMsgBuf* pMsgBuf) {
int32_t index = 0;
SToken sToken;
int64_t interval;
int64_t ts = 0;
char* pTokenEnd = *end;
char tablename[TSDB_TABLE_FNAME_LEN] = {0};
int32_t len = 0;
if (nextStr == NULL) {
tstrncpy(tablename, *str, TSDB_TABLE_FNAME_LEN);
len = (int32_t) strlen(tablename);
} else {
len = (int32_t)(nextStr - (*str));
if (len >= TSDB_TABLE_NAME_LEN) {
sprintf(pCmd->payload, "table name too long");
return TSDB_CODE_TSC_INVALID_OPERATION;
if (pToken->type == TK_NOW) {
ts = taosGetTimestamp(timePrec);
} else if (pToken->type == TK_INTEGER) {
bool isSigned = false;
toInteger(pToken->z, pToken->n, 10, &ts, &isSigned);
} else { // parse the RFC-3339/ISO-8601 timestamp format string
if (taosParseTime(pToken->z, time, pToken->n, timePrec, tsDaylight) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp format", pToken->z);
}
memcpy(tablename, *str, nextStr - (*str));
tablename[len] = '\0';
}
(*str) = nextStr + 1;
len = (int32_t)strtrim(tablename);
SToken sToken = {.n = len, .type = TK_ID, .z = tablename};
tGetToken(tablename, &sToken.type);
// Check if the table name available or not
if (tscValidateName(&sToken) != TSDB_CODE_SUCCESS) {
sprintf(pCmd->payload, "table name is invalid");
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
SName name = {0};
if ((code = tscSetTableFullName(&name, &sToken, pSql)) != TSDB_CODE_SUCCESS) {
return code;
}
memset(tablename, 0, tListLen(tablename));
tNameExtractFullName(&name, tablename);
char* p = strdup(tablename);
taosArrayPush(pNameArray, &p);
return TSDB_CODE_SUCCESS;
}
}
int32_t nameComparFn(const void* n1, const void* n2) {
int32_t ret = strcmp(*(char**)n1, *(char**)n2);
if (ret == 0) {
for (int k = pToken->n; pToken->z[k] != '\0'; k++) {
if (pToken->z[k] == ' ' || pToken->z[k] == '\t') continue;
if (pToken->z[k] == ',') {
*end = pTokenEnd;
*time = ts;
return 0;
} else {
return ret > 0? 1:-1;
}
}
static void freeContent(void* p) {
char* ptr = *(char**)p;
tfree(ptr);
}
int tscTransferTableNameList(SSqlObj *pSql, const char *pNameList, int32_t length, SArray* pNameArray) {
SSqlCmd *pCmd = &pSql->cmd;
pCmd->command = TSDB_SQL_MULTI_META;
pCmd->msgType = TDMT_VND_TABLES_META;
int code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
char *str = (char *)pNameList;
SQueryStmtInfo *pQueryInfo = tscGetQueryInfoS(pCmd);
if (pQueryInfo == NULL) {
pSql->res.code = terrno;
return terrno;
}
char *nextStr;
while (1) {
nextStr = strchr(str, ',');
if (nextStr == NULL) {
code = doAddTableName(nextStr, &str, pNameArray, pSql);
break;
}
code = doAddTableName(nextStr, &str, pNameArray, pSql);
/*
* time expression:
* e.g., now+12a, now-5h
*/
SToken valueToken;
index = 0;
sToken = tStrGetToken(pTokenEnd, &index, false);
pTokenEnd += index;
if (sToken.type == TK_MINUS || sToken.type == TK_PLUS) {
index = 0;
valueToken = tStrGetToken(pTokenEnd, &index, false);
pTokenEnd += index;
if (valueToken.n < 2) {
return buildSyntaxErrMsg(pMsgBuf, "value expected in timestamp", sToken.z);
}
char unit = 0;
if (parseAbsoluteDuration(valueToken.z, valueToken.n, &interval, &unit, timePrec) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_OPERATION;
}
if (sToken.type == TK_PLUS) {
ts += interval;
} else {
ts = ts - interval;
}
*end = pTokenEnd;
}
*time = ts;
return TSDB_CODE_SUCCESS;
}
int32_t parseValueToken(char** end, SToken* pToken, SSchema* pSchema, int16_t timePrec, char* tmpTokenBuf, _row_append_fn_t func, void* param, SMsgBuf* pMsgBuf) {
int64_t iv;
char *endptr = NULL;
bool isSigned = false;
int32_t code = checkAndTrimValue(pToken, pSchema->type, tmpTokenBuf, pMsgBuf);
if (code != TSDB_CODE_SUCCESS) {
return code;
}
if (taosArrayGetSize(pNameArray) > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number");
return code;
if (isNullStr(pToken)) {
if (TSDB_DATA_TYPE_TIMESTAMP == pSchema->type && PRIMARYKEY_TIMESTAMP_COL_ID == pSchema->colId) {
int64_t tmpVal = 0;
return func(&tmpVal, pSchema->bytes, param);
}
return func(getNullValue(pSchema->type), 0, param);
}
switch (pSchema->type) {
case TSDB_DATA_TYPE_BOOL: {
if ((pToken->type == TK_BOOL || pToken->type == TK_STRING) && (pToken->n != 0)) {
if (strncmp(pToken->z, "true", pToken->n) == 0) {
return func(&TRUE_VALUE, pSchema->bytes, param);
} else if (strncmp(pToken->z, "false", pToken->n) == 0) {
return func(&FALSE_VALUE, pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
} else if (pToken->type == TK_INTEGER) {
return func(((strtoll(pToken->z, NULL, 10) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else if (pToken->type == TK_FLOAT) {
return func(((strtod(pToken->z, NULL) == 0) ? &FALSE_VALUE : &TRUE_VALUE), pSchema->bytes, param);
} else {
return buildSyntaxErrMsg(pMsgBuf, "invalid bool data", pToken->z);
}
}
size_t len = taosArrayGetSize(pNameArray);
if (len == 1) {
return TSDB_CODE_SUCCESS;
case TSDB_DATA_TYPE_TINYINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid tinyint data", pToken->z);
} else if (!IS_VALID_TINYINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "tinyint data overflow", pToken->z);
}
if (len > TSDB_MULTI_TABLEMETA_MAX_NUM) {
code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
sprintf(pCmd->payload, "tables over the max number");
return code;
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UTINYINT:{
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned tinyint data", pToken->z);
} else if (!IS_VALID_UTINYINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned tinyint data overflow", pToken->z);
}
uint8_t tmpVal = (uint8_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_SMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid smallint data", pToken->z);
} else if (!IS_VALID_SMALLINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "smallint data overflow", pToken->z);
}
int16_t tmpVal = (int16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_USMALLINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned smallint data", pToken->z);
} else if (!IS_VALID_USMALLINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned smallint data overflow", pToken->z);
}
uint16_t tmpVal = (uint16_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_INT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid int data", pToken->z);
} else if (!IS_VALID_INT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "int data overflow", pToken->z);
}
int32_t tmpVal = (int32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned int data", pToken->z);
} else if (!IS_VALID_UINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned int data overflow", pToken->z);
}
uint32_t tmpVal = (uint32_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid bigint data", pToken->z);
} else if (!IS_VALID_BIGINT(iv)) {
return buildSyntaxErrMsg(pMsgBuf, "bigint data overflow", pToken->z);
}
return func(&iv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_UBIGINT: {
if (TSDB_CODE_SUCCESS != toInteger(pToken->z, pToken->n, pToken->type, &iv, &isSigned)) {
return buildSyntaxErrMsg(pMsgBuf, "invalid unsigned bigint data", pToken->z);
} else if (!IS_VALID_UBIGINT((uint64_t)iv)) {
return buildSyntaxErrMsg(pMsgBuf, "unsigned bigint data overflow", pToken->z);
}
uint64_t tmpVal = (uint64_t)iv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_FLOAT: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || dv > FLT_MAX || dv < -FLT_MAX || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal float data", pToken->z);
}
float tmpVal = (float)dv;
return func(&tmpVal, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_DOUBLE: {
double dv;
if (TK_ILLEGAL == toDouble(pToken, &dv, &endptr)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
}
if (((dv == HUGE_VAL || dv == -HUGE_VAL) && errno == ERANGE) || isinf(dv) || isnan(dv)) {
return buildSyntaxErrMsg(pMsgBuf, "illegal double data", pToken->z);
}
return func(&dv, pSchema->bytes, param);
}
case TSDB_DATA_TYPE_BINARY: {
// Too long values will raise the invalid sql error message
if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) {
return buildSyntaxErrMsg(pMsgBuf, "string data overflow", pToken->z);
}
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_NCHAR: {
return func(pToken->z, pToken->n, param);
}
case TSDB_DATA_TYPE_TIMESTAMP: {
int64_t tmpVal;
if (parseTime(end, pToken, timePrec, &tmpVal, pMsgBuf) != TSDB_CODE_SUCCESS) {
return buildSyntaxErrMsg(pMsgBuf, "invalid timestamp", pToken->z);
}
return func(&tmpVal, pSchema->bytes, param);
}
}
return TSDB_CODE_FAILED;
}
int32_t KvRowAppend(const void *value, int32_t len, void *param) {
SKvParam* pa = (SKvParam*) param;
int32_t type = pa->schema->type;
int32_t colId = pa->schema->colId;
if (TSDB_DATA_TYPE_BINARY == type) {
STR_WITH_SIZE_TO_VARSTR(pa->buf, value, len);
tdAddColToKVRow(pa->builder, colId, type, pa->buf);
} else if (TSDB_DATA_TYPE_NCHAR == type) {
// if the converted output len is over than pColumnModel->bytes, return error: 'Argument list too long'
int32_t output = 0;
if (!taosMbsToUcs4(value, len, varDataVal(pa->buf), pa->schema->bytes - VARSTR_HEADER_SIZE, &output)) {
return TSDB_CODE_TSC_SQL_SYNTAX_ERROR;
}
varDataSetLen(pa->buf, output);
tdAddColToKVRow(pa->builder, colId, type, pa->buf);
} else {
tdAddColToKVRow(pa->builder, colId, type, value);
}
taosArraySort(pNameArray, nameComparFn);
taosArrayRemoveDuplicate(pNameArray, nameComparFn, freeContent);
return TSDB_CODE_SUCCESS;
}
bool vgroupInfoIdentical(SNewVgroupInfo *pExisted, SVgroupMsg* src) {
assert(pExisted != NULL && src != NULL);
if (pExisted->numOfEps != src->numOfEps) {
return false;
}
for(int32_t i = 0; i < pExisted->numOfEps; ++i) {
if (pExisted->ep[i].port != src->epAddr[i].port) {
return false;
}
if (strncmp(pExisted->ep[i].fqdn, src->epAddr[i].fqdn, tListLen(pExisted->ep[i].fqdn)) != 0) {
return false;
}
}
return true;
}
SNewVgroupInfo createNewVgroupInfo(SVgroupMsg *pVgroupMsg) {
assert(pVgroupMsg != NULL);
SNewVgroupInfo info = {0};
info.numOfEps = pVgroupMsg->numOfEps;
info.vgId = pVgroupMsg->vgId;
info.inUse = 0; // 0 is the default value of inUse in case of multiple replica
assert(info.numOfEps >= 1 && info.vgId >= 1);
for(int32_t i = 0; i < pVgroupMsg->numOfEps; ++i) {
tstrncpy(info.ep[i].fqdn, pVgroupMsg->epAddr[i].fqdn, TSDB_FQDN_LEN);
info.ep[i].port = pVgroupMsg->epAddr[i].port;
}
return info;
}
char* cloneCurrentDBName(SSqlObj* pSql) {
char *p = NULL;
HttpContext *pCtx = NULL;
pthread_mutex_lock(&pSql->pTscObj->mutex);
STscObj *pTscObj = pSql->pTscObj;
switch (pTscObj->from) {
case TAOS_REQ_FROM_HTTP:
pCtx = pSql->param;
if (pCtx && pCtx->db[0] != '\0') {
char db[TSDB_DB_FNAME_LEN] = {0};
int32_t len = sprintf(db, "%s%s%s", pTscObj->acctId, TS_PATH_DELIMITER, pCtx->db);
assert(len <= sizeof(db));
p = strdup(db);
}
break;
default:
break;
}
if (p == NULL) {
p = strdup(pSql->pTscObj->db);
}
pthread_mutex_unlock(&pSql->pTscObj->mutex);
return p;
}
#endif