Merge pull request #19005 from taosdata/refact/submit_req_marks

opti:schemaless logic
This commit is contained in:
WANG MINGMING 2022-12-19 10:29:36 +08:00 committed by GitHub
commit b50103d437
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 632 additions and 909 deletions

View File

@ -113,7 +113,7 @@ typedef struct {
int32_t sTableNameLen;
char childTableName[TSDB_TABLE_NAME_LEN];
uint64_t uid;
void *key; // for openTsdb telnet
void *key; // for openTsdb
SArray *tags;
@ -177,7 +177,8 @@ typedef struct {
int32_t lineNum;
SSmlMsgBuf msgBuf;
cJSON *root; // for parse json
// cJSON *root; // for parse json
int8_t offset[4];
SSmlLineInfo *lines; // element is SSmlLineInfo
//
@ -216,8 +217,9 @@ SSmlSTableMeta* smlBuildSTableMeta(bool isDataFormat);
int32_t smlSetCTableName(SSmlTableInfo *oneTable);
STableMeta* smlGetMeta(SSmlHandle *info, const void* measure, int32_t measureLen);
int32_t is_same_child_table_telnet(const void *a, const void *b);
int32_t is_same_child_table_json(const void *a, const void *b);
int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len);
int32_t smlClearForRerun(SSmlHandle *info);
int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg);
int32_t smlParseInfluxString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);
int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLineInfo *elements);

View File

@ -235,7 +235,182 @@ SSmlSTableMeta *smlBuildSTableMeta(bool isDataFormat) {
return NULL;
}
//uint16_t smlCalTypeSum(char* endptr, int32_t left){
// uint16_t sum = 0;
// for(int i = 0; i < left; i++){
// sum += endptr[i];
// }
// return sum;
//}
#define RETURN_FALSE \
smlBuildInvalidDataMsg(msg, "invalid data", pVal); \
return false;
#define SET_DOUBLE kvVal->type = TSDB_DATA_TYPE_DOUBLE;\
kvVal->d = result;
#define SET_FLOAT \
if (!IS_VALID_FLOAT(result)) {\
smlBuildInvalidDataMsg(msg, "float out of range[-3.402823466e+38,3.402823466e+38]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_FLOAT;\
kvVal->f = (float)result;
#define SET_BIGINT \
if (smlDoubleToInt64OverFlow(result)) {\
errno = 0;\
int64_t tmp = taosStr2Int64(pVal, &endptr, 10);\
if (errno == ERANGE) {\
smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_BIGINT;\
kvVal->i = tmp;\
return true;\
}\
kvVal->type = TSDB_DATA_TYPE_BIGINT;\
kvVal->i = (int64_t)result;
#define SET_INT \
if (!IS_VALID_INT(result)) {\
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_INT;\
kvVal->i = result;
#define SET_SMALL_INT \
if (!IS_VALID_SMALLINT(result)) {\
smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_SMALLINT;\
kvVal->i = result;
#define SET_UBIGINT \
if (result >= (double)UINT64_MAX || result < 0) {\
errno = 0;\
uint64_t tmp = taosStr2UInt64(pVal, &endptr, 10);\
if (errno == ERANGE || result < 0) {\
smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_UBIGINT;\
kvVal->u = tmp;\
return true;\
}\
kvVal->type = TSDB_DATA_TYPE_UBIGINT;\
kvVal->u = result;
#define SET_UINT \
if (!IS_VALID_UINT(result)) {\
smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_UINT;\
kvVal->u = result;
#define SET_USMALL_INT \
if (!IS_VALID_USMALLINT(result)) {\
smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_USMALLINT;\
kvVal->u = result;
#define SET_TINYINT \
if (!IS_VALID_TINYINT(result)) { \
smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_TINYINT;\
kvVal->i = result;
#define SET_UTINYINT \
if (!IS_VALID_UTINYINT(result)) {\
smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", pVal);\
return false;\
}\
kvVal->type = TSDB_DATA_TYPE_UTINYINT;\
kvVal->u = result;
bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->length;
char * endptr = NULL;
double result = taosStr2Double(pVal, &endptr);
if (pVal == endptr) {
RETURN_FALSE
}
int32_t left = len - (endptr - pVal);
if (left == 0) {
SET_DOUBLE
} else if (left == 3) {
if(endptr[0] == 'f' || endptr[0] == 'F'){
if(endptr[1] == '6' && endptr[2] == '4'){
SET_DOUBLE
}else if(endptr[1] == '3' && endptr[2] == '2'){
SET_FLOAT
}else{
RETURN_FALSE
}
}else if(endptr[0] == 'i' || endptr[0] == 'I'){
if(endptr[1] == '6' && endptr[2] == '4'){
SET_BIGINT
}else if(endptr[1] == '3' && endptr[2] == '2'){
SET_INT
}else if(endptr[1] == '1' && endptr[2] == '6'){
SET_SMALL_INT
}else{
RETURN_FALSE
}
}else if(endptr[0] == 'u' || endptr[0] == 'U'){
if(endptr[1] == '6' && endptr[2] == '4'){
SET_UBIGINT
}else if(endptr[1] == '3' && endptr[2] == '2'){
SET_UINT
}else if(endptr[1] == '1' && endptr[2] == '6'){
SET_USMALL_INT
}else{
RETURN_FALSE
}
}else{
RETURN_FALSE
}
} else if(left == 2){
if(endptr[0] == 'i' || endptr[0] == 'I'){
if(endptr[1] == '8') {
SET_TINYINT
}else{
RETURN_FALSE
}
}else if(endptr[0] == 'u' || endptr[0] == 'U') {
if (endptr[1] == '8') {
SET_UTINYINT
} else {
RETURN_FALSE
}
}else{
RETURN_FALSE
}
} else if(left == 1){
if(endptr[0] == 'i' || endptr[0] == 'I'){
SET_BIGINT
}else if(endptr[0] == 'u' || endptr[0] == 'U') {
SET_UBIGINT
}else{
RETURN_FALSE
}
} else {
RETURN_FALSE;
}
return true;
}
bool smlParseNumberOld(SSmlKv *kvVal, SSmlMsgBuf *msg) {
const char *pVal = kvVal->value;
int32_t len = kvVal->length;
char *endptr = NULL;
@ -885,7 +1060,6 @@ static void smlDestroyInfo(SSmlHandle *info) {
taosMemoryFree(info->lines);
}
cJSON_Delete(info->root);
taosMemoryFreeClear(info);
}
@ -950,7 +1124,7 @@ static int32_t smlParseLineBottom(SSmlHandle *info) {
}else if(info->protocol == TSDB_SML_TELNET_PROTOCOL){
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
}else{
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
}
if(tinfo == NULL){
@ -1201,7 +1375,7 @@ static int smlProcess(SSmlHandle *info, char *lines[], char *rawLine, char *rawL
return code;
}
info->cost.lineNum = numLines;
info->cost.lineNum = info->lineNum;
info->cost.numOfSTables = nodeListSize(info->superTables);
info->cost.numOfCTables = nodeListSize(info->childTables);
@ -1288,10 +1462,9 @@ TAOS_RES *taos_schemaless_insert_inner(TAOS *taos, char *lines[], char *rawLine,
request->code = code;
info->cost.endTime = taosGetTimestampUs();
info->cost.code = code;
smlPrintStatisticInfo(info);
// smlPrintStatisticInfo(info);
end:
uDebug("resultend:%s", request->msgBuf);
smlDestroyInfo(info);
return (TAOS_RES *)request;
}

View File

@ -19,377 +19,65 @@
#include <string.h>
#include "clientSml.h"
#define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM 4
#define JSON_METERS_NAME "__JM"
int32_t is_same_child_table_json(const void *a, const void *b){
return (cJSON_Compare((const cJSON *)a, (const cJSON *)b, true)) ? 0 : 1;
}
static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
cJSON *metric = cJSON_GetObjectItem(root, "metric");
if (!cJSON_IsString(metric)) {
return TSDB_CODE_TSC_INVALID_JSON;
#define JUMP_JSON_SPACE(start) \
while(*(start)){\
if(unlikely(*(start) > 32))\
break;\
else\
(start)++;\
}
elements->measureLen = strlen(metric->valuestring);
if (IS_INVALID_TABLE_LEN(elements->measureLen)) {
uError("OTD:0x%" PRIx64 " Metric lenght is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
}
elements->measure = metric->valuestring;
return TSDB_CODE_SUCCESS;
}
static int64_t smlParseTSFromJSONObj(SSmlHandle *info, cJSON *root, int32_t toPrecision) {
int32_t size = cJSON_GetArraySize(root);
if (unlikely(size != OTD_JSON_SUB_FIELDS_NUM)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (unlikely(!cJSON_IsNumber(value))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (unlikely(!cJSON_IsString(type))) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalidate json", NULL);
return -1;
}
double timeDouble = value->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
}
if (timeDouble == 0) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
if (timeDouble < 0) {
return timeDouble;
}
int64_t tsInt64 = timeDouble;
size_t typeLen = strlen(type->valuestring);
if (typeLen == 1 && (type->valuestring[0] == 's' || type->valuestring[0] == 'S')) {
// seconds
int8_t fromPrecision = TSDB_TIME_PRECISION_SECONDS;
if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
} else if (typeLen == 2 && (type->valuestring[1] == 's' || type->valuestring[1] == 'S')) {
switch (type->valuestring[0]) {
case 'm':
case 'M':
// milliseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MILLI, toPrecision);
static SArray *smlJsonParseTags(char *start, char *end){
SArray *tags = taosArrayInit(4, sizeof(SSmlKv));
while(start < end){
SSmlKv kv = {0};
kv.type = TSDB_DATA_TYPE_NCHAR;
bool isInQuote = false;
while(start < end){
if(unlikely(!isInQuote && *start == '"')){
start++;
kv.key = start;
isInQuote = true;
continue;
}
if(unlikely(isInQuote && *start == '"')){
kv.keyLen = start - kv.key;
start++;
break;
case 'u':
case 'U':
// microseconds
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_MICRO, toPrecision);
}
start++;
}
bool hasColon = false;
while(start < end){
if(unlikely(!hasColon && *start == ':')){
start++;
hasColon = true;
continue;
}
if(unlikely(hasColon && kv.value == NULL && (*start > 32 && *start != '"'))){
kv.value = start;
start++;
continue;
}
if(unlikely(hasColon && kv.value != NULL && (*start == '"' || *start == ',' || *start == '}'))){
kv.length = start - kv.value;
taosArrayPush(tags, &kv);
start++;
break;
case 'n':
case 'N':
return convertTimePrecision(tsInt64, TSDB_TIME_PRECISION_NANO, toPrecision);
break;
default:
return -1;
}
} else {
return -1;
}
}
static uint8_t smlGetTimestampLen(int64_t num) {
uint8_t len = 0;
while ((num /= 10) != 0) {
len++;
}
len++;
return len;
}
static int64_t smlParseTSFromJSON(SSmlHandle *info, cJSON *root) {
// Timestamp must be the first KV to parse
int32_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
cJSON *timestamp = cJSON_GetObjectItem(root, "timestamp");
if (cJSON_IsNumber(timestamp)) {
// timestamp value 0 indicates current system time
double timeDouble = timestamp->valuedouble;
if (unlikely(smlDoubleToInt64OverFlow(timeDouble))) {
smlBuildInvalidDataMsg(&info->msgBuf, "timestamp is too large", NULL);
return -1;
}
if (unlikely(timeDouble < 0)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp is negative", NULL);
return timeDouble;
}else if (unlikely(timeDouble == 0)) {
return taosGetTimestampNs()/smlFactorNS[toPrecision];
}
uint8_t tsLen = smlGetTimestampLen((int64_t)timeDouble);
int8_t fromPrecision = smlGetTsTypeByLen(tsLen);
if (unlikely(fromPrecision == -1)) {
smlBuildInvalidDataMsg(&info->msgBuf,
"timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", NULL);
return -1;
}
int64_t tsInt64 = timeDouble;
if(fromPrecision == TSDB_TIME_PRECISION_SECONDS){
if(smlFactorS[toPrecision] < INT64_MAX / tsInt64){
return tsInt64 * smlFactorS[toPrecision];
}
return -1;
}else{
return convertTimePrecision(timeDouble, fromPrecision, toPrecision);
start++;
}
} else if (cJSON_IsObject(timestamp)) {
return smlParseTSFromJSONObj(info, timestamp, toPrecision);
} else {
smlBuildInvalidDataMsg(&info->msgBuf,
"invalidate json", NULL);
return -1;
}
return tags;
}
static int32_t smlConvertJSONBool(SSmlKv *pVal, char *typeStr, cJSON *value) {
if (strcasecmp(typeStr, "bool") != 0) {
uError("OTD:invalid type(%s) for JSON Bool", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valueint;
return TSDB_CODE_SUCCESS;
}
static int32_t smlConvertJSONNumber(SSmlKv *pVal, char *typeStr, cJSON *value) {
// tinyint
if (strcasecmp(typeStr, "i8") == 0 || strcasecmp(typeStr, "tinyint") == 0) {
if (!IS_VALID_TINYINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(tinyint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_TINYINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// smallint
if (strcasecmp(typeStr, "i16") == 0 || strcasecmp(typeStr, "smallint") == 0) {
if (!IS_VALID_SMALLINT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(smallint)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_SMALLINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// int
if (strcasecmp(typeStr, "i32") == 0 || strcasecmp(typeStr, "int") == 0) {
if (!IS_VALID_INT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(int)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_INT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->i = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// bigint
if (strcasecmp(typeStr, "i64") == 0 || strcasecmp(typeStr, "bigint") == 0) {
pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
if (value->valuedouble >= (double)INT64_MAX) {
pVal->i = INT64_MAX;
} else if (value->valuedouble <= (double)INT64_MIN) {
pVal->i = INT64_MIN;
} else {
pVal->i = value->valuedouble;
}
return TSDB_CODE_SUCCESS;
}
// float
if (strcasecmp(typeStr, "f32") == 0 || strcasecmp(typeStr, "float") == 0) {
if (!IS_VALID_FLOAT(value->valuedouble)) {
uError("OTD:JSON value(%f) cannot fit in type(float)", value->valuedouble);
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE;
}
pVal->type = TSDB_DATA_TYPE_FLOAT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->f = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// double
if (strcasecmp(typeStr, "f64") == 0 || strcasecmp(typeStr, "double") == 0) {
pVal->type = TSDB_DATA_TYPE_DOUBLE;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
pVal->d = value->valuedouble;
return TSDB_CODE_SUCCESS;
}
// if reach here means type is unsupported
uError("OTD:invalid type(%s) for JSON Number", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
static int32_t smlConvertJSONString(SSmlKv *pVal, char *typeStr, cJSON *value) {
if (strcasecmp(typeStr, "binary") == 0) {
pVal->type = TSDB_DATA_TYPE_BINARY;
} else if (strcasecmp(typeStr, "nchar") == 0) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
} else {
uError("OTD:invalid type(%s) for JSON String", typeStr);
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
pVal->length = (int16_t)strlen(value->valuestring);
if (pVal->type == TSDB_DATA_TYPE_BINARY && pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
if (pVal->type == TSDB_DATA_TYPE_NCHAR &&
pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
pVal->value = value->valuestring;
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseValueFromJSONObj(cJSON *root, SSmlKv *kv) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t size = cJSON_GetArraySize(root);
if (size != OTD_JSON_SUB_FIELDS_NUM) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *value = cJSON_GetObjectItem(root, "value");
if (value == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
cJSON *type = cJSON_GetObjectItem(root, "type");
if (!cJSON_IsString(type)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
switch (value->type) {
case cJSON_True:
case cJSON_False: {
ret = smlConvertJSONBool(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
case cJSON_Number: {
ret = smlConvertJSONNumber(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
case cJSON_String: {
ret = smlConvertJSONString(kv, type->valuestring, value);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON_TYPE;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) {
switch (root->type) {
case cJSON_True:
case cJSON_False: {
kv->type = TSDB_DATA_TYPE_BOOL;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
kv->i = root->valueint;
break;
}
case cJSON_Number: {
kv->type = TSDB_DATA_TYPE_DOUBLE;
kv->length = (int16_t)tDataTypes[kv->type].bytes;
kv->d = root->valuedouble;
break;
}
case cJSON_String: {
/* set default JSON type to binary/nchar according to
* user configured parameter tsDefaultJSONStrType
*/
char *tsDefaultJSONStrType = "nchar"; // todo
smlConvertJSONString(kv, tsDefaultJSONStrType, root);
break;
}
case cJSON_Object: {
int32_t ret = smlParseValueFromJSONObj(root, kv);
if (ret != TSDB_CODE_SUCCESS) {
uError("OTD:Failed to parse value from JSON Obj");
return ret;
}
break;
}
default:
return TSDB_CODE_TSC_INVALID_JSON;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseColsFromJSON(cJSON *root, SSmlKv *kv) {
cJSON *metricVal = cJSON_GetObjectItem(root, "value");
if (metricVal == NULL) {
return TSDB_CODE_TSC_INVALID_JSON;
}
int32_t ret = smlParseValueFromJSON(metricVal, kv);
if (ret != TSDB_CODE_SUCCESS) {
return ret;
}
return TSDB_CODE_SUCCESS;
}
static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
static int32_t smlParseTagsFromJSON(SSmlHandle *info, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (unlikely(tags == NULL || tags->type != cJSON_Object)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
// add measure to tags to identify one child table
cJSON *cMeasure = cJSON_AddStringToObject(tags, JSON_METERS_NAME, elements->measure);
if(unlikely(cMeasure == NULL)){
return TSDB_CODE_TSC_INVALID_JSON;
}
elements->tags = (char*)tags;
if(is_same_child_table_json(elements->tags, info->preLine.tags) == 0){
cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
if(is_same_child_table_telnet(elements, &info->preLine) == 0){
return TSDB_CODE_SUCCESS;
}
cJSON_DeleteItemFromObjectCaseSensitive(tags, JSON_METERS_NAME);
bool isSameMeasure = IS_SAME_SUPER_TABLE;
@ -424,31 +112,16 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
taosArraySetSize(preLineKV, 0);
}
int32_t tagNum = cJSON_GetArraySize(tags);
SArray *tags = smlJsonParseTags(elements->tags, elements->tags + elements->tagsLen);
int32_t tagNum = taosArrayGetSize(tags);
for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i);
if (unlikely(tag == NULL)) {
return TSDB_CODE_TSC_INVALID_JSON;
}
// if(unlikely(tag == cMeasure)) continue;
size_t keyLen = strlen(tag->string);
if (unlikely(IS_INVALID_COL_LEN(keyLen))) {
uError("OTD:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
// add kv to SSmlKv
SSmlKv kv ={.key = tag->string, .keyLen = keyLen};
// value
ret = smlParseValueFromJSON(tag, &kv);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
return ret;
}
SSmlKv kv = *(SSmlKv*)taosArrayGet(tags, i);
if(info->dataFormat){
if(unlikely(cnt + 1 > info->currSTableMeta->tableInfo.numOfTags)){
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
@ -456,6 +129,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
if(unlikely(cnt >= taosArrayGetSize(preLineKV))) {
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = (SSmlKv *)taosArrayGet(preLineKV, cnt);
@ -471,6 +145,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
}else{
@ -478,6 +153,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
if(unlikely(cnt >= taosArrayGetSize(superKV))) {
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
SSmlKv *preKV = (SSmlKv *)taosArrayGet(superKV, cnt);
@ -491,6 +167,7 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
if(unlikely(!IS_SAME_KEY)){
info->dataFormat = false;
info->reRun = true;
taosArrayDestroy(tags);
return TSDB_CODE_SUCCESS;
}
}else{
@ -503,8 +180,9 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
}
cnt++;
}
taosArrayDestroy(tags);
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements->tags, POINTER_BYTES, is_same_child_table_json);
SSmlTableInfo *tinfo = (SSmlTableInfo *)nodeListGet(info->childTables, elements, POINTER_BYTES, is_same_child_table_telnet);
if (unlikely(tinfo == NULL)) {
tinfo = smlBuildTableInfo(1, elements->measure, elements->measureLen);
if (unlikely(!tinfo)) {
@ -522,47 +200,224 @@ static int32_t smlParseTagsFromJSON(SSmlHandle *info, cJSON *root, SSmlLineInfo
}
}
nodeListSet(&info->childTables, tags, POINTER_BYTES, tinfo, is_same_child_table_json);
SSmlLineInfo *key = (SSmlLineInfo *)taosMemoryMalloc(sizeof(SSmlLineInfo));
*key = *elements;
tinfo->key = key;
nodeListSet(&info->childTables, key, POINTER_BYTES, tinfo, is_same_child_table_telnet);
}
if (info->dataFormat) info->currTableDataCtx = tinfo->tableDataCtx;
return ret;
}
static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *elements) {
static char* smlJsonGetObj(char *payload){
int leftBracketCnt = 0;
while(*payload) {
if (unlikely(*payload == '{')) {
leftBracketCnt++;
payload++;
continue;
}
if (unlikely(*payload == '}')) {
leftBracketCnt--;
payload++;
if (leftBracketCnt == 0) {
return payload;
} else if (leftBracketCnt < 0) {
return NULL;
}
continue;
}
payload++;
}
return NULL;
}
static inline void smlJsonParseObjFirst(char **start, SSmlLineInfo *element, int8_t *offset){
int index = 0;
while(*(*start)){
if((*start)[0] != '"'){
(*start)++;
continue;
}
if(unlikely(index >= 4)) {
uError("index >= 4, %s", *start)
break;
}
char *sTmp = *start;
if((*start)[1] == 'm' && (*start)[2] == 'e' && (*start)[3] == 't'
&& (*start)[4] == 'r' && (*start)[5] == 'i' && (*start)[6] == 'c' && (*start)[7] == '"'){
(*start) += 8;
bool isInQuote = false;
while(*(*start)){
if(unlikely(!isInQuote && *(*start) == '"')){
(*start)++;
offset[index++] = *start - sTmp;
element->measure = (*start);
isInQuote = true;
continue;
}
if(unlikely(isInQuote && *(*start) == '"')){
element->measureLen = (*start) - element->measure;
break;
}
(*start)++;
}
}else if((*start)[1] == 't' && (*start)[2] == 'i' && (*start)[3] == 'm'
&& (*start)[4] == 'e' && (*start)[5] == 's' && (*start)[6] == 't'
&& (*start)[7] == 'a' && (*start)[8] == 'm' && (*start)[9] == 'p' && (*start)[10] == '"'){
(*start) += 11;
bool hasColon = false;
while(*(*start)){
if(unlikely(!hasColon && *(*start) == ':')){
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->timestamp = (*start);
hasColon = true;
continue;
}
if(unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))){
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
}else if((*start)[1] == 'v' && (*start)[2] == 'a' && (*start)[3] == 'l'
&& (*start)[4] == 'u' && (*start)[5] == 'e' && (*start)[6] == '"'){
(*start) += 7;
bool hasColon = false;
while(*(*start)){
if(unlikely(!hasColon && *(*start) == ':')){
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->cols = (*start);
hasColon = true;
continue;
}
if(unlikely(hasColon && (*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32))){
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
}else if((*start)[1] == 't' && (*start)[2] == 'a' && (*start)[3] == 'g'
&& (*start)[4] == 's' && (*start)[5] == '"'){
(*start) += 6;
while(*(*start)){
if(unlikely(*(*start) == ':')){
(*start)++;
JUMP_JSON_SPACE((*start))
offset[index++] = *start - sTmp;
element->tags = (*start);
char* tmp = smlJsonGetObj((*start));
if(tmp){
element->tagsLen = tmp - (*start);
*start = tmp;
}
break;
}
(*start)++;
}
}
if(*(*start) == '}'){
(*start)++;
break;
}
(*start)++;
}
}
static inline void smlJsonParseObj(char **start, SSmlLineInfo *element, int8_t *offset){
int index = 0;
while(*(*start)){
if((*start)[0] != '"'){
(*start)++;
continue;
}
if(unlikely(index >= 4)) {
uError("index >= 4, %s", *start)
break;
}
if((*start)[1] == 'm'){
(*start) += offset[index++];
element->measure = *start;
while(*(*start)){
if(unlikely(*(*start) == '"')){
element->measureLen = (*start) - element->measure;
break;
}
(*start)++;
}
}else if((*start)[1] == 't' && (*start)[2] == 'i'){
(*start) += offset[index++];
element->timestamp = *start;
while(*(*start)){
if(unlikely(*(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)){
element->timestampLen = (*start) - element->timestamp;
break;
}
(*start)++;
}
}else if((*start)[1] == 'v'){
(*start) += offset[index++];
element->cols = *start;
while(*(*start)){
if(unlikely( *(*start) == ',' || *(*start) == '}' || (*(*start)) <= 32)){
element->colsLen = (*start) - element->cols;
break;
}
(*start)++;
}
}else if((*start)[1] == 't' && (*start)[2] == 'a'){
(*start) += offset[index++];
element->tags = (*start);
char* tmp = smlJsonGetObj((*start));
if(tmp){
element->tagsLen = tmp - (*start);
*start = tmp;
}
break;
}
if(*(*start) == '}'){
(*start)++;
break;
}
(*start)++;
}
}
static int32_t smlParseJSONString(SSmlHandle *info, char **start, SSmlLineInfo *elements) {
int32_t ret = TSDB_CODE_SUCCESS;
int32_t size = cJSON_GetArraySize(root);
// outmost json fields has to be exactly 4
if (unlikely(size != OTD_JSON_FIELDS_NUM)) {
uError("OTD:0x%" PRIx64 " Invalid number of JSON fields in data point %d", info->id, size);
return TSDB_CODE_TSC_INVALID_JSON;
if(info->offset[0] == 0){
smlJsonParseObjFirst(start, elements, info->offset);
}else{
smlJsonParseObj(start, elements, info->offset);
}
if(**start == '\0') return TSDB_CODE_SUCCESS;
// Parse metric
ret = smlParseMetricFromJSON(info, root, elements);
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric from JSON payload", info->id);
return ret;
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (smlParseNumber(&kv, &info->msgBuf)) {
kv.length = (int16_t)tDataTypes[kv.type].bytes;
}else{
return TSDB_CODE_TSC_INVALID_VALUE;
}
uDebug("OTD:0x%" PRIx64 " Parse metric from JSON payload finished", info->id);
// Parse metric value
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN};
ret = smlParseColsFromJSON(root, &kv);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse metric value from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse metric value from JSON payload finished", info->id);
// Parse tags
ret = smlParseTagsFromJSON(info, root, elements);
ret = smlParseTagsFromJSON(info, elements);
if (unlikely(ret)) {
uError("OTD:0x%" PRIx64 " Unable to parse tags from JSON payload", info->id);
return ret;
}
uDebug("OTD:0x%" PRIx64 " Parse tags from JSON payload finished", info->id);
if(unlikely(info->reRun)){
return TSDB_CODE_SUCCESS;
@ -570,12 +425,11 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *e
// Parse timestamp
// notice!!! put ts back to tag to ensure get meta->precision
int64_t ts = smlParseTSFromJSON(info, root);
int64_t ts = smlParseOpenTsdbTime(info, elements->timestamp, elements->timestampLen);
if (unlikely(ts < 0)) {
uError("OTD:0x%" PRIx64 " Unable to parse timestamp from JSON payload", info->id);
return TSDB_CODE_INVALID_TIMESTAMP;
}
uDebug("OTD:0x%" PRIx64 " Parse timestamp from JSON payload finished", info->id);
SSmlKv kvTs = { .key = TS, .keyLen = TS_LEN, .type = TSDB_DATA_TYPE_TIMESTAMP, .i = ts, .length = (size_t)tDataTypes[TSDB_DATA_TYPE_TIMESTAMP].bytes};
if(info->dataFormat){
@ -603,46 +457,35 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlLineInfo *e
}
int32_t smlParseJSON(SSmlHandle *info, char *payload) {
int32_t payloadNum = 0;
int32_t payloadNum = 1 << 15;
int32_t ret = TSDB_CODE_SUCCESS;
if (unlikely(payload == NULL)) {
uError("SML:0x%" PRIx64 " empty JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
info->root = cJSON_Parse(payload);
if (unlikely(info->root == NULL)) {
uError("SML:0x%" PRIx64 " parse json failed:%s", info->id, payload);
return TSDB_CODE_TSC_INVALID_JSON;
}
// multiple data points must be sent in JSON array
if (cJSON_IsArray(info->root)) {
payloadNum = cJSON_GetArraySize(info->root);
} else if (cJSON_IsObject(info->root)) {
payloadNum = 1;
} else {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
return TSDB_CODE_TSC_INVALID_JSON;
}
int32_t i = 0;
while (i < payloadNum) {
cJSON *dataPoint = (payloadNum == 1 && cJSON_IsObject(info->root)) ? info->root : cJSON_GetArrayItem(info->root, i);
int cnt = 0;
char *dataPointStart = payload;
while (1) {
if(info->dataFormat) {
SSmlLineInfo element = {0};
ret = smlParseJSONString(info, dataPoint, &element);
ret = smlParseJSONString(info, &dataPointStart, &element);
}else{
ret = smlParseJSONString(info, dataPoint, info->lines + i);
if(cnt >= payloadNum){
payloadNum = payloadNum << 1;
void* tmp = taosMemoryRealloc(info->lines, payloadNum * sizeof(SSmlLineInfo));
if(tmp != NULL){
info->lines = (SSmlLineInfo*)tmp;
}
}
ret = smlParseJSONString(info, &dataPointStart, info->lines + cnt);
}
if (unlikely(ret != TSDB_CODE_SUCCESS)) {
uError("SML:0x%" PRIx64 " Invalid JSON Payload", info->id);
return ret;
}
if(*dataPointStart == '\0') break;
if(unlikely(info->reRun)){
i = 0;
cnt = 0;
dataPointStart = payload;
info->lineNum = payloadNum;
ret = smlClearForRerun(info);
if(ret != TSDB_CODE_SUCCESS){
@ -650,8 +493,9 @@ int32_t smlParseJSON(SSmlHandle *info, char *payload) {
}
continue;
}
i++;
cnt++;
}
info->lineNum = cnt;
return TSDB_CODE_SUCCESS;
}
}

View File

@ -120,34 +120,58 @@ static int64_t smlParseInfluxTime(SSmlHandle *info, const char *data, int32_t le
return ts;
}
static int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
// binary
if (smlIsBinary(pVal->value, pVal->length)) {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->length -= BINARY_ADD_LEN;
if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
int32_t smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) {
if (pVal->value[0] == '"'){ // binary
if (pVal->length >= 2 && pVal->value[pVal->length - 1] == '"') {
pVal->type = TSDB_DATA_TYPE_BINARY;
pVal->length -= BINARY_ADD_LEN;
if (pVal->length > TSDB_MAX_BINARY_LEN - VARSTR_HEADER_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
pVal->value += (BINARY_ADD_LEN - 1);
return TSDB_CODE_SUCCESS;
}
pVal->value += (BINARY_ADD_LEN - 1);
return TSDB_CODE_SUCCESS;
}
// nchar
if (smlIsNchar(pVal->value, pVal->length)) {
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length -= NCHAR_ADD_LEN;
if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
pVal->value += (NCHAR_ADD_LEN - 1);
return TSDB_CODE_SUCCESS;
return TSDB_CODE_TSC_INVALID_VALUE;
}
// bool
if (smlParseBool(pVal)) {
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return TSDB_CODE_SUCCESS;
if(pVal->value[0] == 'l' || pVal->value[0] == 'L'){ // nchar
if (pVal->value[1] == '"' && pVal->value[pVal->length - 1] == '"' && pVal->length >= 3){
pVal->type = TSDB_DATA_TYPE_NCHAR;
pVal->length -= NCHAR_ADD_LEN;
if (pVal->length > (TSDB_MAX_NCHAR_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) {
return TSDB_CODE_PAR_INVALID_VAR_COLUMN_LEN;
}
pVal->value += (NCHAR_ADD_LEN - 1);
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pVal->value[0] == 't' || pVal->value[0] == 'T'){
if(pVal->length == 1 || (pVal->length == 4 && (pVal->value[1] == 'r' || pVal->value[1] == 'R')
&& (pVal->value[2] == 'u' || pVal->value[2] == 'U')
&& (pVal->value[3] == 'e' || pVal->value[3] == 'E'))){
pVal->i = TSDB_TRUE;
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_TSC_INVALID_VALUE;
}
if (pVal->value[0] == 'f' || pVal->value[0] == 'F'){
if(pVal->length == 1 || (pVal->length == 5 && (pVal->value[1] == 'a' || pVal->value[1] == 'A')
&& (pVal->value[2] == 'l' || pVal->value[2] == 'L')
&& (pVal->value[3] == 's' || pVal->value[3] == 'S')
&& (pVal->value[4] == 'e' || pVal->value[4] == 'E'))){
pVal->i = TSDB_FALSE;
pVal->type = TSDB_DATA_TYPE_BOOL;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
return TSDB_CODE_SUCCESS;
}
return TSDB_CODE_TSC_INVALID_VALUE;
}
// number
if (smlParseNumber(pVal, msg)) {
pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
@ -432,7 +456,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
bool isInQuote = false;
while (*sql < sqlEnd) {
// parse value
if (IS_QUOTE(*sql)) {
if (unlikely(IS_QUOTE(*sql))) {
isInQuote = !isInQuote;
(*sql)++;
continue;
@ -468,6 +492,7 @@ static int32_t smlParseColKv(SSmlHandle *info, char **sql, char *sqlEnd,
SSmlKv kv = {.key = key, .keyLen = keyLen, .value = value, .length = valueLen};
int32_t ret = smlParseValue(&kv, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "smlParseValue error", value);
return ret;
}

View File

@ -27,7 +27,7 @@ int32_t is_same_child_table_telnet(const void *a, const void *b){
&& ((t1->tagsLen == t2->tagsLen) && memcmp(t1->tags, t2->tags, t1->tagsLen) == 0)) ? 0 : 1;
}
static int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
int64_t smlParseOpenTsdbTime(SSmlHandle *info, const char *data, int32_t len) {
uint8_t toPrecision = info->currSTableMeta ? info->currSTableMeta->tableInfo.precision : TSDB_TIME_PRECISION_NANO;
if (unlikely(!data)) {
@ -283,11 +283,10 @@ int32_t smlParseTelnetString(SSmlHandle *info, char *sql, char *sqlEnd, SSmlLine
}
SSmlKv kv = {.key = VALUE, .keyLen = VALUE_LEN, .value = elements->cols, .length = (size_t)elements->colsLen};
if (smlParseNumber(&kv, &info->msgBuf)) {
kv.length = (int16_t)tDataTypes[kv.type].bytes;
}else{
if (smlParseValue(&kv, &info->msgBuf) != TSDB_CODE_SUCCESS) {
return TSDB_CODE_TSC_INVALID_VALUE;
}
JUMP_SPACE(sql, sqlEnd)
elements->tags = sql;

View File

@ -239,6 +239,7 @@ TEST(testCase, smlParseCols_Test) {
info->protocol = TSDB_SML_LINE_PROTOCOL;
info->dataFormat = false;
SSmlLineInfo elements = {0};
info->msgBuf = msgBuf;
const char *data =
"st,t=1 cb\\=in=\"pass\\,it "
@ -413,28 +414,28 @@ TEST(testCase, smlParseCols_Test) {
smlDestroyInfo(info);
}
TEST(testCase, smlGetTimestampLen_Test) {
uint8_t len = smlGetTimestampLen(0);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(390);
ASSERT_EQ(len, 3);
len = smlGetTimestampLen(-1);
ASSERT_EQ(len, 1);
len = smlGetTimestampLen(-10);
ASSERT_EQ(len, 2);
len = smlGetTimestampLen(-390);
ASSERT_EQ(len, 3);
}
//TEST(testCase, smlGetTimestampLen_Test) {
// uint8_t len = smlGetTimestampLen(0);
// ASSERT_EQ(len, 1);
//
// len = smlGetTimestampLen(1);
// ASSERT_EQ(len, 1);
//
// len = smlGetTimestampLen(10);
// ASSERT_EQ(len, 2);
//
// len = smlGetTimestampLen(390);
// ASSERT_EQ(len, 3);
//
// len = smlGetTimestampLen(-1);
// ASSERT_EQ(len, 1);
//
// len = smlGetTimestampLen(-10);
// ASSERT_EQ(len, 2);
//
// len = smlGetTimestampLen(-390);
// ASSERT_EQ(len, 3);
//}
TEST(testCase, smlParseNumber_Test) {
SSmlKv kv = {0};
@ -486,138 +487,58 @@ TEST(testCase, smlParseTelnetLine_error_Test) {
smlDestroyInfo(info);
}
TEST(testCase, smlParseTelnetLine_json_error_Test) {
SSmlHandle *info = smlBuildSmlInfo(NULL);
info->protocol = TSDB_SML_JSON_PROTOCOL;
info->dataFormat = false;
ASSERT_NE(info, nullptr);
const char *sql[] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 13468464009999333322222223,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
"]",
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400i,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"web01\",\n"
" \"dc\": \"lga\"\n"
" }\n"
" },\n"
"]",
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"groupid\": { \n"
" \"value\" : 2,\n"
" \"type\" : \"nchar\"\n"
" },\n"
" \"location\": { \n"
" \"value\" : \"北京\",\n"
" \"type\" : \"binary\"\n"
" },\n"
" \"id\": \"d1001\"\n"
" }\n"
" },\n"
"]",
};
int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
SSmlLineInfo elements = {0};
ret = smlParseTelnetString(info, (char *)sql[i], (char*)(sql[i] + strlen(sql[i])), &elements);
ASSERT_NE(ret, 0);
}
smlDestroyInfo(info);
}
TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
SSmlHandle *info = smlBuildSmlInfo(NULL);
info->protocol = TSDB_SML_JSON_PROTOCOL;
info->dataFormat = false;
ASSERT_NE(info, nullptr);
const char *sql[] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"lga\"\n"
" }\n"
" },\n"
"]",
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": 8\n"
" }\n"
" },\n"
"]",
};
int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
SSmlLineInfo elements = {0};
ret = smlParseTelnetString(info, (char *)sql[i], (char*)(sql[i] + strlen(sql[i])), &elements);
if (ret != TSDB_CODE_SUCCESS) break;
}
ASSERT_NE(ret, 0);
smlDestroyInfo(info);
}
TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
SSmlHandle *info = smlBuildSmlInfo(NULL);
info->protocol = TSDB_SML_JSON_PROTOCOL;
info->dataFormat = false;
ASSERT_NE(info, nullptr);
const char *sql[] = {
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": 18,\n"
" \"tags\": {\n"
" \"host\": \"lga\"\n"
" }\n"
" },\n"
"]",
"[\n"
" {\n"
" \"metric\": \"sys.cpu.nice\",\n"
" \"timestamp\": 1346846400,\n"
" \"value\": \"18\",\n"
" \"tags\": {\n"
" \"host\": \"fff\"\n"
" }\n"
" },\n"
"]",
"[{\"metric\":\"sys.cpu.nice\",\"timestamp\": 1346846400,\"value\": 18,\"tags\": {\"host\": \"lga\"}},{\"metric\": \"sys.sdfa\",\"timestamp\": 1346846400,\"value\": \"18\",\"tags\": {\"host\": 8932}},]",
};
int ret = TSDB_CODE_SUCCESS;
for (int i = 0; i < sizeof(sql) / sizeof(sql[0]); i++) {
SSmlLineInfo elements = {0};
ret = smlParseTelnetString(info, (char *)sql[i], (char*)(sql[i] + strlen(sql[i])), &elements);
if (ret != TSDB_CODE_SUCCESS) break;
char *dataPointStart = (char *)sql[i];
int8_t offset[4] = {0};
while (1) {
SSmlLineInfo elements = {0};
if(offset[0] == 0){
smlJsonParseObjFirst(&dataPointStart, &elements, offset);
}else{
smlJsonParseObj(&dataPointStart, &elements, offset);
}
if(*dataPointStart == '\0') break;
SArray *tags = smlJsonParseTags(elements.tags, elements.tags + elements.tagsLen);
size_t num = taosArrayGetSize(tags);
ASSERT_EQ(num, 1);
taosArrayDestroy(tags);
}
}
ASSERT_NE(ret, 0);
smlDestroyInfo(info);
}
TEST(testCase, smlParseNumber_performance_Test) {
char msg[256] = {0};
SSmlMsgBuf msgBuf;
SSmlKv kv;
char* str[3] = {"2893f64", "2323u32", "93u8"};
for (int i = 0; i < 3; ++i) {
int64_t t1 = taosGetTimestampUs();
for (int j = 0; j < 10000000; ++j) {
kv.value = str[i];
kv.length = strlen(str[i]);
smlParseNumber(&kv, &msgBuf);
}
printf("smlParseNumber:%s cost:%" PRId64, str[i], taosGetTimestampUs() - t1);
printf("\n");
int64_t t2 = taosGetTimestampUs();
for (int j = 0; j < 10000000; ++j) {
kv.value = str[i];
kv.length = strlen(str[i]);
smlParseNumberOld(&kv, &msgBuf);
}
printf("smlParseNumberOld:%s cost:%" PRId64, str[i], taosGetTimestampUs() - t2);
printf("\n\n");
}
}

View File

@ -76,13 +76,14 @@ class TDTestCase:
tdSql.query(f"select * from {dbname}.`sys.cpu.nice` order by _ts")
tdSql.checkRows(2)
tdSql.checkData(0, 1, 9.000000000)
tdSql.checkData(0, 2, "lga")
tdSql.checkData(0, 3, "web02")
tdSql.checkData(0, 4, None)
tdSql.checkData(0, 2, "web02")
tdSql.checkData(0, 3, None)
tdSql.checkData(0, 4, "lga")
tdSql.checkData(1, 1, 18.000000000)
tdSql.checkData(1, 2, "lga")
tdSql.checkData(1, 3, "web01")
tdSql.checkData(1, 4, "t1")
tdSql.checkData(1, 2, "web01")
tdSql.checkData(1, 3, "t1")
tdSql.checkData(0, 4, "lga")
tdSql.query(f"select * from {dbname}.macylr")
tdSql.checkRows(2)

View File

@ -156,28 +156,7 @@ int smlProcess_json3_Test() {
taos_free_result(pRes);
const char *sql[] = {
"{\"metric\":\"meter_current1\",\"timestamp\":{\"value\":1662344042,\"type\":\"s\"},\"value\":{\"value\":10.3,\"type\":\"i64\"},\"tags\":{\"t1\":{\"value\":2,\"type\":\"bigint\"},\"t2\":{\"value\":2,\"type\":\"int\"},\"t3\":{\"value\":2,\"type\":\"i16\"},\"t4\":{\"value\":2,\"type\":\"i8\"},\"t5\":{\"value\":2,\"type\":\"f32\"},\"t6\":{\"value\":2,\"type\":\"double\"},\"t7\":{\"value\":\"8323\",\"type\":\"binary\"},\"t8\":{\"value\":\"北京\",\"type\":\"nchar\"},\"t9\":{\"value\":true,\"type\":\"bool\"},\"id\":\"d1001\"}}"};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
taos_close(taos);
return code;
}
int smlProcess_json4_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql[] = {
"{\"metric\":\"meter_current2\",\"timestamp\":{\"value\":1662344042000,\"type\":\"ms\"},\"value\":\"ni\",\"tags\":{\"t1\":{\"value\":20,\"type\":\"i64\"},\"t2\":{\"value\":25,\"type\":\"i32\"},\"t3\":{\"value\":2,\"type\":\"smallint\"},\"t4\":{\"value\":2,\"type\":\"tinyint\"},\"t5\":{\"value\":2,\"type\":\"float\"},\"t6\":{\"value\":0.2,\"type\":\"f64\"},\"t7\":\"nsj\",\"t8\":{\"value\":\"北京\",\"type\":\"nchar\"},\"t9\":false,\"id\":\"d1001\"}}"
"[{\"metric\":\"sys.cpu.nice\",\"timestamp\":0,\"value\":\"18\",\"tags\":{\"host\":\"web01\",\"id\":\"t1\",\"dc\":\"lga\"}}]"
};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_NANO_SECONDS);
@ -686,52 +665,6 @@ int sml_oom_Test() {
return code;
}
int sml_16368_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql[] = {
"[{\"metric\": \"st123456\", \"timestamp\": {\"value\": 1626006833639000, \"type\": \"us\"}, \"value\": 1, "
"\"tags\": {\"t1\": 3, \"t2\": {\"value\": 4, \"type\": \"double\"}, \"t3\": {\"value\": \"t3\", \"type\": "
"\"binary\"}}},"
"{\"metric\": \"st123456\", \"timestamp\": {\"value\": 1626006833739000, \"type\": \"us\"}, \"value\": 2, "
"\"tags\": {\"t1\": {\"value\": 4, \"type\": \"double\"}, \"t3\": {\"value\": \"t4\", \"type\": \"binary\"}, "
"\"t2\": {\"value\": 5, \"type\": \"double\"}, \"t4\": {\"value\": 5, \"type\": \"double\"}}},"
"{\"metric\": \"stb_name\", \"timestamp\": {\"value\": 1626006833639100, \"type\": \"us\"}, \"value\": 3, "
"\"tags\": {\"t2\": {\"value\": 5, \"type\": \"double\"}, \"t3\": {\"value\": \"ste\", \"type\": \"nchar\"}}},"
"{\"metric\": \"stf567890\", \"timestamp\": {\"value\": 1626006833639200, \"type\": \"us\"}, \"value\": 4, "
"\"tags\": {\"t1\": {\"value\": 4, \"type\": \"bigint\"}, \"t3\": {\"value\": \"t4\", \"type\": \"binary\"}, "
"\"t2\": {\"value\": 5, \"type\": \"double\"}, \"t4\": {\"value\": 5, \"type\": \"double\"}}},"
"{\"metric\": \"st123456\", \"timestamp\": {\"value\": 1626006833639300, \"type\": \"us\"}, \"value\": "
"{\"value\": 5, \"type\": \"double\"}, \"tags\": {\"t1\": {\"value\": 4, \"type\": \"double\"}, \"t2\": 5.0, "
"\"t3\": {\"value\": \"t4\", \"type\": \"binary\"}}},"
"{\"metric\": \"stb_name\", \"timestamp\": {\"value\": 1626006833639400, \"type\": \"us\"}, \"value\": "
"{\"value\": 6, \"type\": \"double\"}, \"tags\": {\"t2\": 5.0, \"t3\": {\"value\": \"ste2\", \"type\": "
"\"nchar\"}}},"
"{\"metric\": \"stb_name\", \"timestamp\": {\"value\": 1626006834639400, \"type\": \"us\"}, \"value\": "
"{\"value\": 7, \"type\": \"double\"}, \"tags\": {\"t2\": {\"value\": 5.0, \"type\": \"double\"}, \"t3\": "
"{\"value\": \"ste2\", \"type\": \"nchar\"}}},"
"{\"metric\": \"st123456\", \"timestamp\": {\"value\": 1626006833839006, \"type\": \"us\"}, \"value\": "
"{\"value\": 8, \"type\": \"double\"}, \"tags\": {\"t1\": {\"value\": 4, \"type\": \"double\"}, \"t3\": "
"{\"value\": \"t4\", \"type\": \"binary\"}, \"t2\": {\"value\": 5, \"type\": \"double\"}, \"t4\": {\"value\": 5, "
"\"type\": \"double\"}}},"
"{\"metric\": \"st123456\", \"timestamp\": {\"value\": 1626006833939007, \"type\": \"us\"}, \"value\": "
"{\"value\": 9, \"type\": \"double\"}, \"tags\": {\"t1\": 4, \"t3\": {\"value\": \"t4\", \"type\": \"binary\"}, "
"\"t2\": {\"value\": 5, \"type\": \"double\"}, \"t4\": {\"value\": 5, \"type\": \"double\"}}}]"};
pRes = taos_schemaless_insert(taos, (char **)sql, 0, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_MICRO_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_dup_time_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -771,214 +704,6 @@ int sml_dup_time_Test() {
return code;
}
int sml_16960_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
taos_free_result(pRes);
const char *sql[] = {
"["
"{"
"\"timestamp\":"
""
"{ \"value\": 1664418955000, \"type\": \"ms\" }"
","
"\"value\":"
""
"{ \"value\": 830525384, \"type\": \"int\" }"
","
"\"tags\": {"
"\"id\": \"stb00_0\","
"\"t0\":"
""
"{ \"value\": 83972721, \"type\": \"int\" }"
","
"\"t1\":"
""
"{ \"value\": 539147525, \"type\": \"int\" }"
","
"\"t2\":"
""
"{ \"value\": 618258572, \"type\": \"int\" }"
","
"\"t3\":"
""
"{ \"value\": -10536201, \"type\": \"int\" }"
","
"\"t4\":"
""
"{ \"value\": 349227409, \"type\": \"int\" }"
","
"\"t5\":"
""
"{ \"value\": 249347042, \"type\": \"int\" }"
"},"
"\"metric\": \"stb0\""
"},"
"{"
"\"timestamp\":"
""
"{ \"value\": 1664418955001, \"type\": \"ms\" }"
","
"\"value\":"
""
"{ \"value\": -588348364, \"type\": \"int\" }"
","
"\"tags\": {"
"\"id\": \"stb00_0\","
"\"t0\":"
""
"{ \"value\": 83972721, \"type\": \"int\" }"
","
"\"t1\":"
""
"{ \"value\": 539147525, \"type\": \"int\" }"
","
"\"t2\":"
""
"{ \"value\": 618258572, \"type\": \"int\" }"
","
"\"t3\":"
""
"{ \"value\": -10536201, \"type\": \"int\" }"
","
"\"t4\":"
""
"{ \"value\": 349227409, \"type\": \"int\" }"
","
"\"t5\":"
""
"{ \"value\": 249347042, \"type\": \"int\" }"
"},"
"\"metric\": \"stb0\""
"},"
"{"
"\"timestamp\":"
""
"{ \"value\": 1664418955002, \"type\": \"ms\" }"
","
"\"value\":"
""
"{ \"value\": -370310823, \"type\": \"int\" }"
","
"\"tags\": {"
"\"id\": \"stb00_0\","
"\"t0\":"
""
"{ \"value\": 83972721, \"type\": \"int\" }"
","
"\"t1\":"
""
"{ \"value\": 539147525, \"type\": \"int\" }"
","
"\"t2\":"
""
"{ \"value\": 618258572, \"type\": \"int\" }"
","
"\"t3\":"
""
"{ \"value\": -10536201, \"type\": \"int\" }"
","
"\"t4\":"
""
"{ \"value\": 349227409, \"type\": \"int\" }"
","
"\"t5\":"
""
"{ \"value\": 249347042, \"type\": \"int\" }"
"},"
"\"metric\": \"stb0\""
"},"
"{"
"\"timestamp\":"
""
"{ \"value\": 1664418955003, \"type\": \"ms\" }"
","
"\"value\":"
""
"{ \"value\": -811250191, \"type\": \"int\" }"
","
"\"tags\": {"
"\"id\": \"stb00_0\","
"\"t0\":"
""
"{ \"value\": 83972721, \"type\": \"int\" }"
","
"\"t1\":"
""
"{ \"value\": 539147525, \"type\": \"int\" }"
","
"\"t2\":"
""
"{ \"value\": 618258572, \"type\": \"int\" }"
","
"\"t3\":"
""
"{ \"value\": -10536201, \"type\": \"int\" }"
","
"\"t4\":"
""
"{ \"value\": 349227409, \"type\": \"int\" }"
","
"\"t5\":"
""
"{ \"value\": 249347042, \"type\": \"int\" }"
"},"
"\"metric\": \"stb0\""
"},"
"{"
"\"timestamp\":"
""
"{ \"value\": 1664418955004, \"type\": \"ms\" }"
","
"\"value\":"
""
"{ \"value\": -330340558, \"type\": \"int\" }"
","
"\"tags\": {"
"\"id\": \"stb00_0\","
"\"t0\":"
""
"{ \"value\": 83972721, \"type\": \"int\" }"
","
"\"t1\":"
""
"{ \"value\": 539147525, \"type\": \"int\" }"
","
"\"t2\":"
""
"{ \"value\": 618258572, \"type\": \"int\" }"
","
"\"t3\":"
""
"{ \"value\": -10536201, \"type\": \"int\" }"
","
"\"t4\":"
""
"{ \"value\": 349227409, \"type\": \"int\" }"
","
"\"t5\":"
""
"{ \"value\": 249347042, \"type\": \"int\" }"
"},"
"\"metric\": \"stb0\""
"}"
"]"};
pRes = taos_schemaless_insert(taos, (char **)sql, sizeof(sql) / sizeof(sql[0]), TSDB_SML_JSON_PROTOCOL,
TSDB_SML_TIMESTAMP_MILLI_SECONDS);
printf("%s result:%s\n", __FUNCTION__, taos_errstr(pRes));
int code = taos_errno(pRes);
taos_free_result(pRes);
taos_close(taos);
return code;
}
int sml_add_tag_col_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -1013,10 +738,10 @@ int sml_add_tag_col_Test() {
int smlProcess_18784_Test() {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db schemaless 1");
TAOS_RES *pRes = taos_query(taos, "create database if not exists db_18784 schemaless 1");
taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db");
pRes = taos_query(taos, "use db_18784");
taos_free_result(pRes);
const char *sql[] = {
@ -1159,7 +884,46 @@ int sml_ttl_Test() {
return code;
}
//char *str[] ={
// "",
// "f64",
// "F64",
// "f32",
// "F32",
// "i",
// "I",
// "i64",
// "I64",
// "u",
// "U",
// "u64",
// "U64",
// "i32",
// "I32",
// "u32",
// "U32",
// "i16",
// "I16",
// "u16",
// "U16",
// "i8",
// "I8",
// "u8",
// "U8",
//};
//uint8_t smlCalTypeSum(char* endptr, int32_t left){
// uint8_t sum = 0;
// for(int i = 0; i < left; i++){
// sum += endptr[i];
// }
// return sum;
//}
int main(int argc, char *argv[]) {
// for(int i = 0; i < sizeof(str)/sizeof(str[0]); i++){
// printf("str:%s \t %d\n", str[i], smlCalTypeSum(str[i], strlen(str[i])));
// }
int ret = 0;
ret = sml_ttl_Test();
ASSERT(!ret);
@ -1172,11 +936,9 @@ int main(int argc, char *argv[]) {
ret = smlProcess_json1_Test();
ASSERT(!ret);
ret = smlProcess_json2_Test();
ASSERT(!ret);
ASSERT(ret);
ret = smlProcess_json3_Test();
ASSERT(!ret);
ret = smlProcess_json4_Test();
ASSERT(!ret);
ASSERT(ret);
ret = sml_TD15662_Test();
ASSERT(!ret);
ret = sml_TD15742_Test();
@ -1185,12 +947,8 @@ int main(int argc, char *argv[]) {
ASSERT(!ret);
ret = sml_oom_Test();
ASSERT(!ret);
ret = sml_16368_Test();
ASSERT(!ret);
ret = sml_dup_time_Test();
ASSERT(!ret);
ret = sml_16960_Test();
ASSERT(!ret);
ret = sml_add_tag_col_Test();
ASSERT(!ret);
ret = smlProcess_18784_Test();