From a4629e56cecdfdda88d139f0cf3ebe66d33f6277 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 29 Apr 2022 10:58:46 +0800 Subject: [PATCH] refactor:add schemaless function --- include/common/tcommon.h | 8 +- include/common/ttypes.h | 16 +- source/client/inc/clientSml.h | 51 +- source/client/src/clientSml.c | 1335 ++++++++++------------------ source/libs/parser/src/parInsert.c | 39 +- 5 files changed, 545 insertions(+), 904 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 1c516e8a96..61f8ebae66 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -246,7 +246,13 @@ typedef struct { int32_t keyLen; uint8_t type; int16_t length; - const char* value; + union{ + const char* value; + int64_t i; + uint64_t u; + double d; + float f; + }; int32_t valueLen; } SSmlKv; diff --git a/include/common/ttypes.h b/include/common/ttypes.h index 405b20c521..377b443843 100644 --- a/include/common/ttypes.h +++ b/include/common/ttypes.h @@ -186,14 +186,14 @@ typedef struct { #define IS_NUMERIC_TYPE(_t) ((IS_SIGNED_NUMERIC_TYPE(_t)) || (IS_UNSIGNED_NUMERIC_TYPE(_t)) || (IS_FLOAT_TYPE(_t))) #define IS_MATHABLE_TYPE(_t) (IS_NUMERIC_TYPE(_t) || (_t) == (TSDB_DATA_TYPE_BOOL) || (_t) == (TSDB_DATA_TYPE_TIMESTAMP)) -#define IS_VALID_TINYINT(_t) ((_t) > INT8_MIN && (_t) <= INT8_MAX) -#define IS_VALID_SMALLINT(_t) ((_t) > INT16_MIN && (_t) <= INT16_MAX) -#define IS_VALID_INT(_t) ((_t) > INT32_MIN && (_t) <= INT32_MAX) -#define IS_VALID_BIGINT(_t) ((_t) > INT64_MIN && (_t) <= INT64_MAX) -#define IS_VALID_UTINYINT(_t) ((_t) >= 0 && (_t) < UINT8_MAX) -#define IS_VALID_USMALLINT(_t) ((_t) >= 0 && (_t) < UINT16_MAX) -#define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) < UINT32_MAX) -#define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) < UINT64_MAX) +#define IS_VALID_TINYINT(_t) ((_t) >= INT8_MIN && (_t) <= INT8_MAX) +#define IS_VALID_SMALLINT(_t) ((_t) >= INT16_MIN && (_t) <= INT16_MAX) +#define IS_VALID_INT(_t) ((_t) >= INT32_MIN && (_t) <= INT32_MAX) +#define IS_VALID_BIGINT(_t) ((_t) >= INT64_MIN && (_t) <= INT64_MAX) +#define IS_VALID_UTINYINT(_t) ((_t) >= 0 && (_t) <= UINT8_MAX) +#define IS_VALID_USMALLINT(_t) ((_t) >= 0 && (_t) <= UINT16_MAX) +#define IS_VALID_UINT(_t) ((_t) >= 0 && (_t) <= UINT32_MAX) +#define IS_VALID_UBIGINT(_t) ((_t) >= 0 && (_t) <= UINT64_MAX) #define IS_VALID_FLOAT(_t) ((_t) >= -FLT_MAX && (_t) <= FLT_MAX) #define IS_VALID_DOUBLE(_t) ((_t) >= -DBL_MAX && (_t) <= DBL_MAX) diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h index b711c837c0..c970f1e954 100644 --- a/source/client/inc/clientSml.h +++ b/source/client/inc/clientSml.h @@ -36,48 +36,55 @@ typedef struct { int32_t measureTagsLen; int32_t tagsLen; int32_t colsLen; + int32_t timestampLen; } TAOS_PARSE_ELEMENTS; typedef struct { const char *sTableName; // super table name uint8_t sTableNameLen; - char childTableName[TSDB_TABLE_NAME_LEN]; - uint64_t uid; + char childTableName[TSDB_TABLE_NAME_LEN]; + uint64_t uid; - SArray* tags; - SArray *cols; + SArray *tags; + SArray *cols; // elements are SHashObj for find by key quickly + + SArray colsColumn; // elements are cols key string } TAOS_SML_DATA_POINT_TAGS; typedef struct SSmlSTableMeta { // char *sTableName; // super table name // uint8_t sTableNameLen; - uint8_t precision; // the number of precision - SHashObj* tagHash; - SHashObj* fieldHash; + uint8_t precision; // the number of precision + SHashObj *tagHash; + SHashObj *fieldHash; } SSmlSTableMeta; +typedef struct SMsgBuf { + int32_t len; + char *buf; +} SMsgBuf; + typedef struct { - uint64_t id; + uint64_t id; - SMLProtocolType protocol; - int32_t tsType; + SMLProtocolType protocol; + int32_t tsType; - SHashObj* childTables; - SHashObj* superTables; + SHashObj *childTables; + SHashObj *superTables; - SHashObj* metaHashObj; - SHashObj* pVgHash; + SHashObj *metaHashObj; + SHashObj *pVgHash; - void* exec; + void *exec; - STscObj* taos; - SCatalog* pCatalog; - SRequestObj* pRequest; - SQuery* pQuery; + STscObj *taos; + SCatalog *pCatalog; + SRequestObj *pRequest; + SQuery *pQuery; - int32_t affectedRows; - char *msgBuf; - int16_t msgLen; + int32_t affectedRows; + SMsgBuf msgBuf; } SSmlLinesInfo; int smlInsert(TAOS* taos, SSmlLinesInfo* info); diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index dfbefcb0bf..3739c83109 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -13,6 +13,7 @@ #include "taoserror.h" #include "taos.h" #include "ttime.h" +#include "tstrbuild.h" typedef struct { @@ -31,14 +32,15 @@ typedef struct { #define SLASH '\\' #define tsMaxSQLStringLen (1024*1024) +#define TSNAMELEN 2 +#define TAGNAMELEN 3 //================================================================================================= static uint64_t linesSmlHandleId = 0; +static const char* TS = "ts"; +static const char* TAG = "tag"; + -static int32_t insertChildTablePointsBatch(void* pVoid, char* name, char* name1, SArray* pArray, SArray* pArray1, - SArray* pArray2, SArray* pArray3, size_t size, SSmlLinesInfo* info); -static int32_t doInsertChildTablePoints(void* pVoid, char* sql, char* name, SArray* pArray, SArray* pArray1, - SSmlLinesInfo* info); uint64_t genLinesSmlId() { uint64_t id; @@ -49,9 +51,15 @@ uint64_t genLinesSmlId() { return id; } +static int32_t buildInvalidDataMsg(SMsgBuf* pBuf, const char *msg1, const char *msg2) { + if(msg1) snprintf(pBuf->buf, pBuf->len, "%s:", msg1); + if(msg2) strncpy(pBuf->buf, msg2, pBuf->len); + return TSDB_CODE_SML_INVALID_DATA; +} + int compareSmlColKv(const void* p1, const void* p2) { - TAOS_SML_KV* kv1 = (TAOS_SML_KV*)p1; - TAOS_SML_KV* kv2 = (TAOS_SML_KV*)p2; + SSmlKv* kv1 = (SSmlKv *)p1; + SSmlKv* kv2 = (SSmlKv*)p2; int kvLen1 = (int)strlen(kv1->key); int kvLen2 = (int)strlen(kv2->key); int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2)); @@ -78,7 +86,7 @@ typedef struct { typedef struct { char sTableName[TSDB_TABLE_NAME_LEN]; - SSchema* field; + SSmlKv * field; } SAlterSTableActionInfo; typedef struct { @@ -89,94 +97,17 @@ typedef struct { }; } SSchemaAction; -static int32_t getFieldBytesFromSmlKv(TAOS_SML_KV* kv, int32_t* bytes, uint64_t id) { - if (!IS_VAR_DATA_TYPE(kv->type)) { - *bytes = tDataTypes[kv->type].bytes; - } else { - if (kv->type == TSDB_DATA_TYPE_NCHAR) { - TdUcs4 *ucs = taosMemoryMalloc(kv->length * TSDB_NCHAR_SIZE + 1); - int32_t bytesNeeded = 0; - bool succ = taosMbsToUcs4(kv->value, kv->length, ucs, kv->length * TSDB_NCHAR_SIZE, &bytesNeeded); - if (!succ) { - taosMemoryFree(ucs); - uError("SML:0x%"PRIx64" convert nchar string to UCS4_LE failed:%s", id, kv->value); - return TSDB_CODE_TSC_INVALID_VALUE; - } - taosMemoryFree(ucs); - *bytes = bytesNeeded + VARSTR_HEADER_SIZE; - } else if (kv->type == TSDB_DATA_TYPE_BINARY) { - *bytes = kv->length + VARSTR_HEADER_SIZE; - } - } - return 0; -} +static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT_TAGS *tags) { + int32_t size = taosArrayGetSize(tags->tags); + ASSERT(size > 0); + qsort(tags->tags, size, POINTER_BYTES, compareSmlColKv); -static int32_t buildSmlKvSchema(TAOS_SML_KV* smlKv, SHashObj* hash, SArray* array, SSmlLinesInfo* info) { - SSchema* pField = NULL; - size_t* pFieldIdx = taosHashGet(hash, smlKv->key, strlen(smlKv->key)); - size_t fieldIdx = -1; - int32_t code = 0; - if (pFieldIdx) { - fieldIdx = *pFieldIdx; - pField = taosArrayGet(array, fieldIdx); - - if (pField->type != smlKv->type) { - uError("SML:0x%"PRIx64" type mismatch. key %s, type %d. type before %d", info->id, smlKv->key, smlKv->type, pField->type); - return TSDB_CODE_TSC_INVALID_VALUE; - } - - int32_t bytes = 0; - code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id); - if (code != 0) { - return code; - } - pField->bytes = MAX(pField->bytes, bytes); - - } else { - SSchema field = {0}; - size_t tagKeyLen = strlen(smlKv->key); - strncpy(field.name, smlKv->key, tagKeyLen); - field.name[tagKeyLen] = '\0'; - field.type = smlKv->type; - - int32_t bytes = 0; - code = getFieldBytesFromSmlKv(smlKv, &bytes, info->id); - if (code != 0) { - return code; - } - field.bytes = bytes; - - pField = taosArrayPush(array, &field); - fieldIdx = taosArrayGetSize(array) - 1; - taosHashPut(hash, field.name, tagKeyLen, &fieldIdx, sizeof(fieldIdx)); - } - - smlKv->fieldSchemaIdx = (uint32_t)fieldIdx; - - return 0; -} - -static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableName, int* tableNameLen, - SSmlLinesInfo* info) { - uDebug("SML:0x%"PRIx64" taos_sml_insert get child table name through md5", info->id); - if (point->tagNum) { - qsort(point->tags, point->tagNum, sizeof(TAOS_SML_KV), compareSmlColKv); - } - - SStringBuilder sb; memset(&sb, 0, sizeof(sb)); - char sTableName[TSDB_TABLE_NAME_LEN] = {0}; - strncpy(sTableName, point->stableName, strlen(point->stableName)); - //strtolower(sTableName, point->stableName); - taosStringBuilderAppendString(&sb, sTableName); - for (int j = 0; j < point->tagNum; ++j) { - taosStringBuilderAppendChar(&sb, ','); - TAOS_SML_KV* tagKv = point->tags + j; - char tagName[TSDB_COL_NAME_LEN] = {0}; - strncpy(tagName, tagKv->key, strlen(tagKv->key)); - //strtolower(tagName, tagKv->key); - taosStringBuilderAppendString(&sb, tagName); - taosStringBuilderAppendChar(&sb, '='); - taosStringBuilderAppend(&sb, tagKv->value, tagKv->length); + SStringBuilder sb = {0}; + taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen); + for (int j = 0; j < size; ++j) { + SSmlKv *tagKv = taosArrayGetP(tags->tags, j); + taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen); + taosStringBuilderAppendStringLen(&sb, tagKv->value, tagKv->valueLen); } size_t len = 0; char* keyJoined = taosStringBuilderGetResult(&sb, &len); @@ -186,183 +117,74 @@ static int32_t getSmlMd5ChildTableName(TAOS_SML_DATA_POINT* point, char* tableNa tMD5Final(&context); uint64_t digest1 = *(uint64_t*)(context.digest); uint64_t digest2 = *(uint64_t*)(context.digest + 8); - *tableNameLen = snprintf(tableName, *tableNameLen, - "t_%016"PRIx64"%016"PRIx64, digest1, digest2); + snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); taosStringBuilderDestroy(&sb); - uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tableName); - return 0; -} - -static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT* point, SSmlLinesInfo* info) { - uDebug("SML:0x%"PRIx64" taos_sml_insert build child table name", info->id); - char childTableName[TSDB_TABLE_NAME_LEN]; - int32_t tableNameLen = TSDB_TABLE_NAME_LEN; - getSmlMd5ChildTableName(point, childTableName, &tableNameLen, info); - point->childTableName = calloc(1, tableNameLen+1); - strncpy(point->childTableName, childTableName, tableNameLen); - point->childTableName[tableNameLen] = '\0'; - return 0; -} - -static int32_t buildDataPointSchemas(TAOS_SML_DATA_POINT* points, int numPoint, SArray* stableSchemas, SSmlLinesInfo* info) { - int32_t code = 0; - SHashObj* sname2shema = taosHashInit(32, - taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - - for (int i = 0; i < numPoint; ++i) { - TAOS_SML_DATA_POINT* point = &points[i]; - size_t stableNameLen = strlen(point->stableName); - size_t* pStableIdx = taosHashGet(sname2shema, point->stableName, stableNameLen); - SSmlSTableSchema* pStableSchema = NULL; - size_t stableIdx = -1; - if (pStableIdx) { - pStableSchema= taosArrayGet(stableSchemas, *pStableIdx); - stableIdx = *pStableIdx; - } else { - SSmlSTableSchema schema; - strncpy(schema.sTableName, point->stableName, stableNameLen); - schema.sTableName[stableNameLen] = '\0'; - schema.fields = taosArrayInit(64, sizeof(SSchema)); - schema.tags = taosArrayInit(8, sizeof(SSchema)); - schema.tagHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - schema.fieldHash = taosHashInit(128, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - - pStableSchema = taosArrayPush(stableSchemas, &schema); - stableIdx = taosArrayGetSize(stableSchemas) - 1; - taosHashPut(sname2shema, schema.sTableName, stableNameLen, &stableIdx, sizeof(size_t)); - } - - for (int j = 0; j < point->tagNum; ++j) { - TAOS_SML_KV* tagKv = point->tags + j; - if (!point->childTableName) { - buildSmlChildTableName(point, info); - } - - code = buildSmlKvSchema(tagKv, pStableSchema->tagHash, pStableSchema->tags, info); - if (code != 0) { - uError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, tagKv->key); - return code; - } - } - - //for Line Protocol tags may be omitted, add a tag with NULL value - if (point->tagNum == 0) { - if (!point->childTableName) { - buildSmlChildTableName(point, info); - } - char tagNullName[TSDB_COL_NAME_LEN] = {0}; - size_t nameLen = strlen(tsSmlTagNullName); - strncpy(tagNullName, tsSmlTagNullName, nameLen); - addEscapeCharToString(tagNullName, (int32_t)nameLen); - size_t* pTagNullIdx = taosHashGet(pStableSchema->tagHash, tagNullName, nameLen); - if (!pTagNullIdx) { - SSchema tagNull = {0}; - tagNull.type = TSDB_DATA_TYPE_NCHAR; - tagNull.bytes = TSDB_NCHAR_SIZE + VARSTR_HEADER_SIZE; - strncpy(tagNull.name, tagNullName, nameLen); - taosArrayPush(pStableSchema->tags, &tagNull); - size_t tagNullIdx = taosArrayGetSize(pStableSchema->tags) - 1; - taosHashPut(pStableSchema->tagHash, tagNull.name, nameLen, &tagNullIdx, sizeof(tagNullIdx)); - } - } - - for (int j = 0; j < point->fieldNum; ++j) { - TAOS_SML_KV* fieldKv = point->fields + j; - code = buildSmlKvSchema(fieldKv, pStableSchema->fieldHash, pStableSchema->fields, info); - if (code != 0) { - uError("SML:0x%"PRIx64" build data point schema failed. point no.: %d, tag key: %s", info->id, i, fieldKv->key); - return code; - } - } - - point->schemaIdx = (uint32_t)stableIdx; - } - - size_t numStables = taosArrayGetSize(stableSchemas); - for (int32_t i = 0; i < numStables; ++i) { - SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i); - taosHashCleanup(schema->tagHash); - taosHashCleanup(schema->fieldHash); - } - taosHashCleanup(sname2shema); - - uDebug("SML:0x%"PRIx64" build point schema succeed. num of super table: %zu", info->id, numStables); - for (int32_t i = 0; i < numStables; ++i) { - SSmlSTableSchema* schema = taosArrayGet(stableSchemas, i); - uDebug("\ttable name: %s, tags number: %zu, fields number: %zu", schema->sTableName, - taosArrayGetSize(schema->tags), taosArrayGetSize(schema->fields)); - } - + tags->uid = digest1; + uDebug("SML: child table name: %s", tags->childTableName); return 0; } static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[], SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) { - char fieldName[TSDB_COL_NAME_LEN] = {0}; - strcpy(fieldName, pointColField->name); - - size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName)); - if (pDbIndex) { - SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex); - assert(strcasecmp(dbAttr->name, pointColField->name) == 0); - if (pointColField->type != dbAttr->type) { - uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name, - pointColField->type, dbAttr->type); - return TSDB_CODE_TSC_INVALID_VALUE; - } - - if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) { - if (isTag) { - action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE; - } else { - action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE; - } - memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); - memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); - action->alterSTable.field = pointColField; - *actionNeeded = true; - } - } else { - if (isTag) { - action->action = SCHEMA_ACTION_ADD_TAG; - } else { - action->action = SCHEMA_ACTION_ADD_COLUMN; - } - memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); - memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); - action->alterSTable.field = pointColField; - *actionNeeded = true; - } - if (*actionNeeded) { - uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName, - action->action); - } +// char fieldName[TSDB_COL_NAME_LEN] = {0}; +// strcpy(fieldName, pointColField->name); +// +// size_t* pDbIndex = taosHashGet(dbAttrHash, fieldName, strlen(fieldName)); +// if (pDbIndex) { +// SSchema* dbAttr = taosArrayGet(dbAttrArray, *pDbIndex); +// assert(strcasecmp(dbAttr->name, pointColField->name) == 0); +// if (pointColField->type != dbAttr->type) { +// uError("SML:0x%"PRIx64" point type and db type mismatch. key: %s. point type: %d, db type: %d", info->id, pointColField->name, +// pointColField->type, dbAttr->type); +// return TSDB_CODE_TSC_INVALID_VALUE; +// } +// +// if (IS_VAR_DATA_TYPE(pointColField->type) && (pointColField->bytes > dbAttr->bytes)) { +// if (isTag) { +// action->action = SCHEMA_ACTION_CHANGE_TAG_SIZE; +// } else { +// action->action = SCHEMA_ACTION_CHANGE_COLUMN_SIZE; +// } +// memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); +// memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); +// action->alterSTable.field = pointColField; +// *actionNeeded = true; +// } +// } else { +// if (isTag) { +// action->action = SCHEMA_ACTION_ADD_TAG; +// } else { +// action->action = SCHEMA_ACTION_ADD_COLUMN; +// } +// memset(&action->alterSTable, 0, sizeof(SAlterSTableActionInfo)); +// memcpy(action->alterSTable.sTableName, sTableName, TSDB_TABLE_NAME_LEN); +// action->alterSTable.field = pointColField; +// *actionNeeded = true; +// } +// if (*actionNeeded) { +// uDebug("SML:0x%" PRIx64 " generate schema action. column name: %s, action: %d", info->id, fieldName, +// action->action); +// } return 0; } -static int32_t buildColumnDescription(TAOS_SML_KV* field, - char* buf, int32_t bufSize, int32_t* outBytes) { +static int32_t buildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) { uint8_t type = field->type; char tname[TSDB_TABLE_NAME_LEN] = {0}; memcpy(tname, field->key, field->keyLen); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { - int32_t bytes = field->length - VARSTR_HEADER_SIZE; - if (type == TSDB_DATA_TYPE_NCHAR) { - bytes = bytes/TSDB_NCHAR_SIZE; - } + int32_t bytes = field->valueLen; // todo int out = snprintf(buf, bufSize,"%s %s(%d)", tname,tDataTypes[field->type].name, bytes); *outBytes = out; } else { - int out = snprintf(buf, bufSize, "%s %s", - tname, tDataTypes[type].name); + int out = snprintf(buf, bufSize, "%s %s", tname, tDataTypes[type].name); *outBytes = out; } return 0; } - static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) { int32_t code = 0; int32_t outBytes = 0; @@ -472,7 +294,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); char* pos = result + n; int freeBytes = capacity - n; - TAOS_SML_KV **kv = taosHashIterate(action->createSTable.fields, NULL); + SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL); while(kv){ buildColumnDescription(*kv, pos, freeBytes, &outBytes); pos += outBytes; freeBytes -= outBytes; @@ -523,168 +345,6 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf return code; } -static int32_t destroySmlSTableSchema(SSmlSTableSchema* schema) { - taosHashCleanup(schema->tagHash); - taosHashCleanup(schema->fieldHash); - taosArrayDestroy(&schema->tags); - taosArrayDestroy(&schema->fields); - return 0; -} - -static int32_t fillDbSchema(STableMeta* tableMeta, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) { - schema->tags = taosArrayInit(8, sizeof(SSchema)); - schema->fields = taosArrayInit(64, sizeof(SSchema)); - schema->tagHash = taosHashInit(8, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - schema->fieldHash = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), true, false); - - tstrncpy(schema->sTableName, tableName, strlen(tableName)+1); - schema->precision = tableMeta->tableInfo.precision; - for (int i=0; itableInfo.numOfColumns; ++i) { - SSchema field; - tstrncpy(field.name, tableMeta->schema[i].name, strlen(tableMeta->schema[i].name)+1); - addEscapeCharToString(field.name, (int16_t)strlen(field.name)); - field.type = tableMeta->schema[i].type; - field.bytes = tableMeta->schema[i].bytes; - taosArrayPush(schema->fields, &field); - size_t fieldIndex = taosArrayGetSize(schema->fields) - 1; - taosHashPut(schema->fieldHash, field.name, strlen(field.name), &fieldIndex, sizeof(fieldIndex)); - } - - for (int i=0; itableInfo.numOfTags; ++i) { - int j = i + tableMeta->tableInfo.numOfColumns; - SSchema field; - tstrncpy(field.name, tableMeta->schema[j].name, strlen(tableMeta->schema[j].name)+1); - addEscapeCharToString(field.name, (int16_t)strlen(field.name)); - field.type = tableMeta->schema[j].type; - field.bytes = tableMeta->schema[j].bytes; - taosArrayPush(schema->tags, &field); - size_t tagIndex = taosArrayGetSize(schema->tags) - 1; - taosHashPut(schema->tagHash, field.name, strlen(field.name), &tagIndex, sizeof(tagIndex)); - } - uDebug("SML:0x%"PRIx64 " load table schema succeed. table name: %s, columns number: %d, tag number: %d, precision: %d", - info->id, tableName, tableMeta->tableInfo.numOfColumns, tableMeta->tableInfo.numOfTags, schema->precision); - return TSDB_CODE_SUCCESS; -} - -static int32_t getSuperTableMetaFromLocalCache(TAOS* taos, char* tableName, STableMeta** outTableMeta, SSmlLinesInfo* info) { - int32_t code = 0; - STableMeta* tableMeta = NULL; - - SSqlObj* pSql = calloc(1, sizeof(SSqlObj)); - if (pSql == NULL) { - uError("SML:0x%" PRIx64 " failed to allocate memory, reason:%s", info->id, strerror(errno)); - code = TSDB_CODE_TSC_OUT_OF_MEMORY; - return code; - } - pSql->pTscObj = taos; - pSql->signature = pSql; - pSql->fp = NULL; - - registerSqlObj(pSql); - char tableNameBuf[TSDB_TABLE_NAME_LEN + TS_BACKQUOTE_CHAR_SIZE] = {0}; - memcpy(tableNameBuf, tableName, strlen(tableName)); - SStrToken tableToken = {.z = tableNameBuf, .n = (uint32_t)strlen(tableName), .type = TK_ID}; - tGetToken(tableNameBuf, &tableToken.type); - bool dbIncluded = false; - // Check if the table name available or not - if (tscValidateName(&tableToken, true, &dbIncluded) != TSDB_CODE_SUCCESS) { - code = TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; - sprintf(pSql->cmd.payload, "table name is invalid"); - taosReleaseRef(tscObjRef, pSql->self); - return code; - } - - SName sname = {0}; - if ((code = tscSetTableFullName(&sname, &tableToken, pSql, dbIncluded)) != TSDB_CODE_SUCCESS) { - taosReleaseRef(tscObjRef, pSql->self); - return code; - } - - char fullTableName[TSDB_TABLE_FNAME_LEN] = {0}; - memset(fullTableName, 0, tListLen(fullTableName)); - tNameExtractFullName(&sname, fullTableName); - - size_t size = 0; - taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), fullTableName, strlen(fullTableName), NULL, (void**)&tableMeta, &size); - - STableMeta* stableMeta = tableMeta; - if (tableMeta != NULL && tableMeta->tableType == TSDB_CHILD_TABLE) { - taosHashGetCloneExt(UTIL_GET_TABLEMETA(pSql), tableMeta->sTableName, strlen(tableMeta->sTableName), NULL, - (void**)stableMeta, &size); - } - taosReleaseRef(tscObjRef, pSql->self); - - if (stableMeta != tableMeta) { - taosMemoryFree(tableMeta); - } - - if (stableMeta != NULL) { - if (outTableMeta != NULL) { - *outTableMeta = stableMeta; - } else { - taosMemoryFree(stableMeta); - } - return TSDB_CODE_SUCCESS; - } else { - return TSDB_CODE_TSC_NO_META_CACHED; - } -} - -static int32_t retrieveTableMeta(TAOS* taos, char* tableName, STableMeta** pTableMeta, SSmlLinesInfo* info) { - int32_t code = 0; - int32_t retries = 0; - STableMeta* tableMeta = NULL; - while (retries++ <= TSDB_MAX_REPLICA && tableMeta == NULL) { - STscObj* pObj = (STscObj*)taos; - if (pObj == NULL || pObj->signature != pObj) { - terrno = TSDB_CODE_TSC_DISCONNECTED; - return TSDB_CODE_TSC_DISCONNECTED; - } - - uDebug("SML:0x%" PRIx64 " retrieve table meta. super table name: %s", info->id, tableName); - code = getSuperTableMetaFromLocalCache(taos, tableName, &tableMeta, info); - if (code == TSDB_CODE_SUCCESS) { - uDebug("SML:0x%" PRIx64 " successfully retrieved table meta. super table name: %s", info->id, tableName); - break; - } else if (code == TSDB_CODE_TSC_NO_META_CACHED) { - char sql[256]; - snprintf(sql, 256, "describe %s", tableName); - TAOS_RES* res = taos_query(taos, sql); - code = taos_errno(res); - if (code != 0) { - uError("SML:0x%" PRIx64 " describe table failure. %s", info->id, taos_errstr(res)); - taos_free_result(res); - return code; - } - taos_free_result(res); - } else { - return code; - } - } - - if (tableMeta != NULL) { - *pTableMeta = tableMeta; - return TSDB_CODE_SUCCESS; - } else { - uError("SML:0x%" PRIx64 " failed to retrieve table meta. super table name: %s", info->id, tableName); - return TSDB_CODE_TSC_NO_META_CACHED; - } -} - -static int32_t loadTableSchemaFromDB(TAOS* taos, char* tableName, SSmlSTableSchema* schema, SSmlLinesInfo* info) { - int32_t code = 0; - STableMeta* tableMeta = NULL; - code = retrieveTableMeta(taos, tableName, &tableMeta, info); - if (code == TSDB_CODE_SUCCESS) { - assert(tableMeta != NULL); - fillDbSchema(tableMeta, tableName, schema, info); - taosMemoryFree(tableMeta); - tableMeta = NULL; - } - - return code; -} - static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) { int32_t code = 0; @@ -706,7 +366,6 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) { if (code == TSDB_CODE_TDB_INVALID_TABLE_ID) { SSchemaAction schemaAction = {0}; schemaAction.action = SCHEMA_ACTION_CREATE_STABLE; - memset(&schemaAction.createSTable, 0, sizeof(SCreateSTableActionInfo)); memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen); schemaAction.createSTable.tags = cTablePoints->tagHash; schemaAction.createSTable.fields = cTablePoints->fieldHash; @@ -728,7 +387,7 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) { return 0; } -static int32_t applyDataPoints(TAOS* taos, SSmlLinesInfo* info) { +static int32_t applyDataPoints(SSmlLinesInfo* info) { int32_t code = TSDB_CODE_SUCCESS; TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashIterate(info->childTables, NULL); @@ -746,43 +405,40 @@ static int32_t applyDataPoints(TAOS* taos, SSmlLinesInfo* info) { STableMeta** pMeta = taosHashGet(info->metaHashObj, tableData->sTableName, tableData->sTableNameLen); ASSERT (NULL != pMeta && NULL != *pMeta); (*pMeta)->vgId = vg.vgId; - (*pMeta)->uid = tableData->uid; - - smlBind(info->exec, tableData->tags, tableData->cols, *pMeta, info->msgBuf, info->msgLen); + (*pMeta)->uid = tableData->uid; // one table merge data block together according uid + code = smlBind(info->exec, tableData->tags, tableData->cols, *pMeta, info->msgBuf.buf, info->msgBuf.len); + if(code != TSDB_CODE_SUCCESS){ + return code; + } oneTable = taosHashIterate(info->childTables, oneTable); } smlBuildOutput(info->exec, info->pVgHash); launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true); - if(info->pRequest->code != TSDB_CODE_SUCCESS){ - - } info->affectedRows = taos_affected_rows(info->pRequest); - return code; + return info->pRequest->code; } -int tscSmlInsert(TAOS* taos, SSmlLinesInfo* info) { +int smlInsert(TAOS* taos, SSmlLinesInfo* info) { uDebug("SML:0x%"PRIx64" taos_sml_insert. number of super tables: %d", info->id, taosHashGetSize(info->superTables)); - int32_t code = TSDB_CODE_SUCCESS; - info->affectedRows = 0; uDebug("SML:0x%"PRIx64" modify db schemas", info->id); - code = modifyDBSchemas(taos, info); + int32_t code = modifyDBSchemas(taos, info); if (code != 0) { uError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code)); - goto clean_up; + return code; } uDebug("SML:0x%"PRIx64" apply data points", info->id); - code = applyDataPoints(taos, info); + code = applyDataPoints(info); if (code != 0) { uError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code)); + return code; } -clean_up: - return code; + return TSDB_CODE_SUCCESS; } //========================================================================= @@ -835,242 +491,307 @@ static void escapeSpecialCharacter(uint8_t field, const char **pos) { *pos = cur; } -bool isValidInteger(char *str) { - char *c = str; - if (*c != '+' && *c != '-' && !isdigit(*c)) { - return false; - } - c++; - while (*c != '\0') { - if (!isdigit(*c)) { - return false; - } - c++; - } - return true; -} - -bool isValidFloat(char *str) { - char *c = str; - uint8_t has_dot, has_exp, has_sign; - has_dot = 0; - has_exp = 0; - has_sign = 0; - - if (*c != '+' && *c != '-' && *c != '.' && !isdigit(*c)) { - return false; - } - if (*c == '.' && isdigit(*(c + 1))) { - has_dot = 1; - } - c++; - while (*c != '\0') { - if (!isdigit(*c)) { - switch (*c) { - case '.': { - if (!has_dot && !has_exp && isdigit(*(c + 1))) { - has_dot = 1; - } else { - return false; - } - break; - } - case 'e': - case 'E': { - if (!has_exp && isdigit(*(c - 1)) && - (isdigit(*(c + 1)) || - *(c + 1) == '+' || - *(c + 1) == '-')) { - has_exp = 1; - } else { - return false; - } - break; - } - case '+': - case '-': { - if (!has_sign && has_exp && isdigit(*(c + 1))) { - has_sign = 1; - } else { - return false; - } - break; - } - default: { - return false; - } - } - } - c++; - } //while - return true; -} - -static bool isInteger(char *pVal, uint16_t len, bool *has_sign) { - if (len <= 1) { - return false; - } - if (pVal[len - 1] == 'i') { - *has_sign = true; - return true; - } - if (pVal[len - 1] == 'u') { - *has_sign = false; - return true; - } - - return false; -} - -static bool isTinyInt(char *pVal, uint16_t len) { +static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 2) { return false; } - if (!strcasecmp(&pVal[len - 2], "i8")) { - //printf("Type is int8(%s)\n", pVal); + const char *signalPos = pVal + len - 2; + if (!strcasecmp(signalPos, "i8")) { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != signalPos){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid tiny int", endptr); + }else if(!IS_VALID_TINYINT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr); + }else{ + kvVal->i = result; + *isValid = true; + } return true; } return false; } -static bool isTinyUint(char *pVal, uint16_t len) { +static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 2) { return false; } if (pVal[0] == '-') { return false; } - if (!strcasecmp(&pVal[len - 2], "u8")) { - //printf("Type is uint8(%s)\n", pVal); + const char *signalPos = pVal + len - 2; + if (!strcasecmp(signalPos, "u8")) { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != signalPos){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr); + }else if(!IS_VALID_UTINYINT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr); + }else{ + kvVal->i = result; + *isValid = true; + } return true; } return false; } -static bool isSmallInt(char *pVal, uint16_t len) { +static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 3) { return false; } - if (!strcasecmp(&pVal[len - 3], "i16")) { - //printf("Type is int16(%s)\n", pVal); + const char *signalPos = pVal + len - 3; + if (!strcasecmp(signalPos, "i16")) { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != signalPos){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid small int", endptr); + }else if(!IS_VALID_SMALLINT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr); + }else{ + kvVal->i = result; + *isValid = true; + } return true; } return false; } -static bool isSmallUint(char *pVal, uint16_t len) { +static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 3) { return false; } if (pVal[0] == '-') { return false; } - if (strcasecmp(&pVal[len - 3], "u16") == 0) { - //printf("Type is uint16(%s)\n", pVal); + const char *signalPos = pVal + len - 3; + if (strcasecmp(signalPos, "u16") == 0) { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != signalPos){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid unsigned small int", endptr); + }else if(!IS_VALID_USMALLINT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr); + }else{ + kvVal->i = result; + *isValid = true; + } return true; } return false; } -static bool isInt(char *pVal, uint16_t len) { +static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 3) { return false; } - if (strcasecmp(&pVal[len - 3], "i32") == 0) { - //printf("Type is int32(%s)\n", pVal); + const char *signalPos = pVal + len - 3; + if (strcasecmp(signalPos, "i32") == 0) { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != signalPos){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid int", endptr); + }else if(!IS_VALID_INT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr); + }else{ + kvVal->i = result; + *isValid = true; + } return true; } return false; } -static bool isUint(char *pVal, uint16_t len) { +static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 3) { return false; } if (pVal[0] == '-') { return false; } - if (strcasecmp(&pVal[len - 3], "u32") == 0) { - //printf("Type is uint32(%s)\n", pVal); + const char *signalPos = pVal + len - 3; + if (strcasecmp(signalPos, "u32") == 0) { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != signalPos){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid unsigned int", endptr); + }else if(!IS_VALID_UINT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr); + }else{ + kvVal->i = result; + *isValid = true; + } return true; } return false; } -static bool isBigInt(char *pVal, uint16_t len) { - if (len <= 3) { - return false; - } - if (strcasecmp(&pVal[len - 3], "i64") == 0) { - //printf("Type is int64(%s)\n", pVal); +static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; + if (len > 3 && strcasecmp(pVal + len - 3, "i64") == 0) { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != pVal + len - 3){ // 78ri8 + *isValid = false; + }else if(!IS_VALID_BIGINT(result)){ + *isValid = false; + }else{ + kvVal->i = result; + *isValid = true; + } + return true; + }else if (len > 1 && pVal[len - 1] == 'i') { + char *endptr = NULL; + int64_t result = strtoll(pVal, &endptr, 10); + if(endptr != pVal + len - 1){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid big int", endptr); + }else if(!IS_VALID_BIGINT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr); + }else{ + kvVal->i = result; + *isValid = true; + } return true; } return false; } -static bool isBigUint(char *pVal, uint16_t len) { +static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 3) { return false; } if (pVal[0] == '-') { return false; } - if (strcasecmp(&pVal[len - 3], "u64") == 0) { - //printf("Type is uint64(%s)\n", pVal); + const char *signalPos = pVal + len - 3; + if (strcasecmp(signalPos, "u64") == 0) { + char *endptr = NULL; + uint64_t result = strtoull(pVal, &endptr, 10); + if(endptr != signalPos){ // 78ri8 + *isValid = false; + buildInvalidDataMsg(msg, "invalid unsigned big int", endptr); + }else if(!IS_VALID_UBIGINT(result)){ + *isValid = false; + buildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr); + }else{ + kvVal->u = result; + *isValid = true; + } return true; } return false; } -static bool isFloat(char *pVal, uint16_t len) { +static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; + char *endptr = NULL; + float result = strtof(pVal, &endptr); + if(endptr == pVal + len && IS_VALID_FLOAT(result)){ // 78 + kvVal->f = result; + *isValid = true; + return true; + } + + if (len > 3 && len f = result; + *isValid = true; + } + return true; + } + return false; +} + +static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; if (len <= 3) { return false; } - if (strcasecmp(&pVal[len - 3], "f32") == 0) { - //printf("Type is float(%s)\n", pVal); + const char *signalPos = pVal + len - 3; + if (len d = result; + *isValid = true; + } return true; } return false; } -static bool isDouble(char *pVal, uint16_t len) { - if (len <= 3) { - return false; - } - if (strcasecmp(&pVal[len - 3], "f64") == 0) { - //printf("Type is double(%s)\n", pVal); - return true; - } - return false; -} - -static bool isBool(char *pVal, uint16_t len, bool *bVal) { - if ((len == 1) && !strcasecmp(&pVal[len - 1], "t")) { +static bool parseBool(SSmlKv *kvVal) { + const char *pVal = kvVal->value; + int32_t len = kvVal->valueLen; + if ((len == 1) && pVal[len - 1] == 't') { //printf("Type is bool(%c)\n", pVal[len - 1]); - *bVal = true; + kvVal->i = true; return true; } - if ((len == 1) && !strcasecmp(&pVal[len - 1], "f")) { + if ((len == 1) && pVal[len - 1] == 'f') { //printf("Type is bool(%c)\n", pVal[len - 1]); - *bVal = false; + kvVal->i = false; return true; } - if((len == 4) && !strcasecmp(&pVal[len - 4], "true")) { + if((len == 4) && !strcasecmp(pVal, "true")) { //printf("Type is bool(%s)\n", &pVal[len - 4]); - *bVal = true; + kvVal->i = true; return true; } - if((len == 5) && !strcasecmp(&pVal[len - 5], "false")) { + if((len == 5) && !strcasecmp(pVal, "false")) { //printf("Type is bool(%s)\n", &pVal[len - 5]); - *bVal = false; + kvVal->i = false; return true; } return false; } -static bool isBinary(char *pVal, uint16_t len) { +static bool isBinary(const char *pVal, uint16_t len) { //binary: "abc" if (len < 2) { return false; @@ -1083,7 +804,7 @@ static bool isBinary(char *pVal, uint16_t len) { return false; } -static bool isNchar(char *pVal, uint16_t len) { +static bool isNchar(const char *pVal, uint16_t len) { //nchar: L"abc" if (len < 3) { return false; @@ -1095,266 +816,97 @@ static bool isNchar(char *pVal, uint16_t len) { return false; } -static bool convertStrToNumber(TAOS_SML_KV *pVal, char *str, SSmlLinesInfo* info) { - errno = 0; - uint8_t type = pVal->type; - int16_t length = pVal->length; - int64_t val_s = 0; - uint64_t val_u = 0; - double val_d = 0.0; - - strntolower_s(str, str, (int32_t)strlen(str)); - if (IS_FLOAT_TYPE(type)) { - val_d = strtod(str, NULL); - } else { - if (IS_SIGNED_NUMERIC_TYPE(type)) { - val_s = strtoll(str, NULL, 10); - } else { - val_u = strtoull(str, NULL, 10); - } - } - - if (errno == ERANGE) { - uError("SML:0x%"PRIx64" Convert number(%s) out of range", info->id, str); - return false; - } - - switch (type) { - case TSDB_DATA_TYPE_TINYINT: - if (!IS_VALID_TINYINT(val_s)) { - return false; - } - pVal->value = calloc(length, 1); - *(int8_t *)(pVal->value) = (int8_t)val_s; - break; - case TSDB_DATA_TYPE_UTINYINT: - if (!IS_VALID_UTINYINT(val_u)) { - return false; - } - pVal->value = calloc(length, 1); - *(uint8_t *)(pVal->value) = (uint8_t)val_u; - break; - case TSDB_DATA_TYPE_SMALLINT: - if (!IS_VALID_SMALLINT(val_s)) { - return false; - } - pVal->value = calloc(length, 1); - *(int16_t *)(pVal->value) = (int16_t)val_s; - break; - case TSDB_DATA_TYPE_USMALLINT: - if (!IS_VALID_USMALLINT(val_u)) { - return false; - } - pVal->value = calloc(length, 1); - *(uint16_t *)(pVal->value) = (uint16_t)val_u; - break; - case TSDB_DATA_TYPE_INT: - if (!IS_VALID_INT(val_s)) { - return false; - } - pVal->value = calloc(length, 1); - *(int32_t *)(pVal->value) = (int32_t)val_s; - break; - case TSDB_DATA_TYPE_UINT: - if (!IS_VALID_UINT(val_u)) { - return false; - } - pVal->value = calloc(length, 1); - *(uint32_t *)(pVal->value) = (uint32_t)val_u; - break; - case TSDB_DATA_TYPE_BIGINT: - if (!IS_VALID_BIGINT(val_s)) { - return false; - } - pVal->value = calloc(length, 1); - *(int64_t *)(pVal->value) = (int64_t)val_s; - break; - case TSDB_DATA_TYPE_UBIGINT: - if (!IS_VALID_UBIGINT(val_u)) { - return false; - } - pVal->value = calloc(length, 1); - *(uint64_t *)(pVal->value) = (uint64_t)val_u; - break; - case TSDB_DATA_TYPE_FLOAT: - if (!IS_VALID_FLOAT(val_d)) { - return false; - } - pVal->value = calloc(length, 1); - *(float *)(pVal->value) = (float)val_d; - break; - case TSDB_DATA_TYPE_DOUBLE: - if (!IS_VALID_DOUBLE(val_d)) { - return false; - } - pVal->value = calloc(length, 1); - *(double *)(pVal->value) = (double)val_d; - break; - default: - return false; - } - return true; -} -//len does not include '\0' from value. -bool convertSmlValueType(TAOS_SML_KV *pVal, char *value, - uint16_t len, SSmlLinesInfo* info, bool isTag) { - if (len <= 0) { - return false; - } - - //convert tags value to Nchar - if (isTag) { - pVal->type = TSDB_DATA_TYPE_NCHAR; - pVal->length = len; - pVal->value = calloc(pVal->length, 1); - memcpy(pVal->value, value, pVal->length); - return true; - } - - //integer number - bool has_sign; - if (isInteger(value, len, &has_sign)) { - pVal->type = has_sign ? TSDB_DATA_TYPE_BIGINT : TSDB_DATA_TYPE_UBIGINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 1] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isTinyInt(value, len)) { - pVal->type = TSDB_DATA_TYPE_TINYINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 2] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isTinyUint(value, len)) { - pVal->type = TSDB_DATA_TYPE_UTINYINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 2] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isSmallInt(value, len)) { - pVal->type = TSDB_DATA_TYPE_SMALLINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isSmallUint(value, len)) { - pVal->type = TSDB_DATA_TYPE_USMALLINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isInt(value, len)) { - pVal->type = TSDB_DATA_TYPE_INT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isUint(value, len)) { - pVal->type = TSDB_DATA_TYPE_UINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isBigInt(value, len)) { - pVal->type = TSDB_DATA_TYPE_BIGINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isBigUint(value, len)) { - pVal->type = TSDB_DATA_TYPE_UBIGINT; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidInteger(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - //floating number - if (isFloat(value, len)) { +static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) { + // put high probability matching type first + bool isValid = false; + if (parseFloat(pVal, &isValid, msg)) { + if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_FLOAT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } - return true; - } - if (isDouble(value, len)) { - pVal->type = TSDB_DATA_TYPE_DOUBLE; - pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - value[len - 3] = '\0'; - if (!isValidFloat(value) || !convertStrToNumber(pVal, value, info)) { - return false; - } return true; } //binary - if (isBinary(value, len)) { + if (isBinary(pVal->value, pVal->valueLen)) { pVal->type = TSDB_DATA_TYPE_BINARY; - pVal->length = len - 2; - pVal->value = calloc(pVal->length, 1); - //copy after " - memcpy(pVal->value, value + 1, pVal->length); + pVal->length = pVal->valueLen - 2; + pVal->valueLen -= 2; + pVal->value = pVal->value++; return true; } //nchar - if (isNchar(value, len)) { + if (isNchar(pVal->value, pVal->valueLen)) { pVal->type = TSDB_DATA_TYPE_NCHAR; - pVal->length = len - 3; - pVal->value = calloc(pVal->length, 1); - //copy after L" - memcpy(pVal->value, value + 2, pVal->length); + pVal->length = pVal->valueLen - 3; + pVal->value = pVal->value+2; + return true; + } + if (parseDouble(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_DOUBLE; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; } //bool - bool bVal; - if (isBool(value, len, &bVal)) { + if (parseBool(pVal)) { pVal->type = TSDB_DATA_TYPE_BOOL; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - pVal->value = calloc(pVal->length, 1); - memcpy(pVal->value, &bVal, pVal->length); return true; } - //Handle default(no appendix) type as DOUBLE - if (isValidInteger(value) || isValidFloat(value)) { - pVal->type = TSDB_DATA_TYPE_DOUBLE; + if (parseTinyInt(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_TINYINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; - if (!convertStrToNumber(pVal, value, info)) { - return false; - } return true; } + if (parseTinyUint(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_UTINYINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; + } + if (parseSmallInt(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_SMALLINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; + } + if (parseSmallUint(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_USMALLINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; + } + if (parseInt(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_INT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; + } + if (parseUint(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_UINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; + } + if (parseBigInt(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_BIGINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; + } + if (parseBigUint(pVal, &isValid, msg)) { + if(!isValid) return false; + pVal->type = TSDB_DATA_TYPE_UBIGINT; + pVal->length = (int16_t)tDataTypes[pVal->type].bytes; + return true; + } + + buildInvalidDataMsg(msg, "invalid data", pVal->value); return false; } - - bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) { char *val = NULL; val = taosHashGet(pHash, key, strlen(key)); @@ -1369,21 +921,6 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) { return false; } -//Table name can only contain digits(0-9),alphebet(a-z),underscore(_) -int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info) { - if (len > TSDB_TABLE_NAME_LEN - 1) { - uError("SML:0x%"PRIx64" child table name cannot exceeds %d characters", info->id, TSDB_TABLE_NAME_LEN - 1); - return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; - } - const char *cur = pTbName; - for (int i = 0; i < len; ++i) { - if(!isdigit(cur[i]) && !isalpha(cur[i]) && (cur[i] != '_')) { - return TSDB_CODE_TSC_LINE_SYNTAX_ERROR; - } - } - return TSDB_CODE_SUCCESS; -} - int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ if(!sql) return TSDB_CODE_SML_INVALID_DATA; while (*sql != '\0') { // jump the space at the begining @@ -1428,8 +965,12 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ } if(!elements->cols) return TSDB_CODE_SML_INVALID_DATA; + bool isInQuote = false; while (*sql != '\0') { - if(*sql == SPACE && *(sql - 1) != SLASH) { + if(*sql == QUOTE && *(sql - 1) != SLASH){ + isInQuote = !isInQuote; + } + if(!isInQuote && *sql == SPACE && *(sql - 1) != SLASH) { break; } sql++; @@ -1444,12 +985,27 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ } sql++; } + if(elements->timestamp){ + elements->timestampLen = sql - elements->timestamp; + } return TSDB_CODE_SUCCESS; } -int32_t parseSmlKV(const char* data, int32_t len, SArray *cols, bool isTag){ +bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgBuf *msg){ + if(isTag && len == 0){ + SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); + kv->key = TAG; + kv->keyLen = TAGNAMELEN; + kv->value = TAG; + kv->valueLen = TAGNAMELEN; + kv->type = TSDB_DATA_TYPE_NCHAR; + if(cols) taosArrayPush(cols, &kv); + return true; + } + for(int i = 0; i < len; i++){ + // parse key const char *key = data + i; int32_t keyLen = 0; while(i < len){ @@ -1459,23 +1015,27 @@ int32_t parseSmlKV(const char* data, int32_t len, SArray *cols, bool isTag){ } i++; } - if(keyLen == 0){ + if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ + buildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); return TSDB_CODE_SML_INVALID_DATA; } + // parse value i++; const char *value = data + i; - int32_t valueLen = 0; while(i < len){ if(data[i] == COMMA && i > 0 && data[i-1] != SLASH){ - valueLen = data + i - value; break; } i++; } + int32_t valueLen = data + i - value; if(valueLen == 0){ + buildInvalidDataMsg(msg, "invalid value", value); return TSDB_CODE_SML_INVALID_DATA; } + + // add kv to SSmlKv SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); kv->key = key; kv->keyLen = keyLen; @@ -1483,9 +1043,15 @@ int32_t parseSmlKV(const char* data, int32_t len, SArray *cols, bool isTag){ kv->valueLen = valueLen; if(isTag){ kv->type = TSDB_DATA_TYPE_NCHAR; + }else{ + if(!convertSmlValue(kv, msg)){ + return TSDB_CODE_SML_INVALID_DATA; + } } + if(cols) taosArrayPush(cols, &kv); } + return TSDB_CODE_SUCCESS; } @@ -1526,13 +1092,13 @@ static int64_t getTimeStampNow(int32_t precision) { } } -static int32_t isValidateTimeStamp(const char *pVal, int32_t len) { +static bool isValidateTimeStamp(const char *pVal, int32_t len) { for (int i = 0; i < len; ++i) { if (!isdigit(pVal[i])) { - return TSDB_CODE_TSC_INVALID_TIME_STAMP; + return false; } } - return TSDB_CODE_SUCCESS; + return true; } static int32_t getTsType(int32_t len) { @@ -1541,47 +1107,53 @@ static int32_t getTsType(int32_t len) { } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) { return TSDB_TIME_PRECISION_MILLI_DIGITS; } else { - return TSDB_CODE_TSC_INVALID_TIME_STAMP; + return -1; } } -static int32_t parseSmlTS(const char* data, SArray *tags, int8_t tsType, SMLProtocolType protocolType){ - int64_t *ts = taosMemoryCalloc(1, sizeof(int64_t)); +static int32_t parseSmlTS(const char* data, int32_t len, SArray *tags, SSmlLinesInfo* info){ + int64_t ts = 0; if(data == NULL){ - if(protocolType == TSDB_SML_LINE_PROTOCOL){ - *ts = getTimeStampNow(tsType); - }else{ - goto cleanup; + if(info->protocol != TSDB_SML_LINE_PROTOCOL){ + buildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; } + ts = getTimeStampNow(info->tsType); }else{ - int32_t len = strlen(data); int ret = isValidateTimeStamp(data, len); if(!ret){ - goto cleanup; + buildInvalidDataMsg(&info->msgBuf, "timestamp must be digit", data); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; } - if(protocolType != TSDB_SML_LINE_PROTOCOL){ + int32_t tsType = -1; + if(info->protocol != TSDB_SML_LINE_PROTOCOL){ tsType = getTsType(len); - if (tsType == TSDB_CODE_TSC_INVALID_TIME_STAMP) { - goto cleanup; + if (tsType == -1) { + buildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; } + }else{ + tsType = info->tsType; } - *ts = getTimeStampValue(data, tsType); - if(*ts == -1){ - goto cleanup; + ts = getTimeStampValue(data, tsType); + if(ts == -1){ + buildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); + return TSDB_CODE_TSC_INVALID_TIME_STAMP; } } SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); - kv->value = (const char*)ts; - kv->valueLen = sizeof(int64_t); + if(!kv){ + return TSDB_CODE_OUT_OF_MEMORY; + } + + kv->key = TS; + kv->keyLen = TSNAMELEN; + kv->i = ts; kv->type = TSDB_DATA_TYPE_TIMESTAMP; kv->length = (int16_t)tDataTypes[kv->type].bytes; if(tags) taosArrayPush(tags, &kv); return TSDB_CODE_SUCCESS; - -cleanup: - taosMemoryFree(ts); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; } //int32_t parseSmlCols(const char* data, SArray *cols){ @@ -1627,19 +1199,21 @@ cleanup: // return TSDB_CODE_SUCCESS; //} -void updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ +bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf *msg){ if(tags){ for (int i = 0; i < taosArrayGetSize(tags); ++i) { SSmlKv *kv = taosArrayGetP(tags, i); + ASSERT(kv->type == TSDB_DATA_TYPE_NCHAR); + SSmlKv **value = taosHashGet(tableMeta->tagHash, kv->key, kv->keyLen); if(value){ - if(kv->type != (*value)->type){ - // todo + ASSERT((*value)->type == TSDB_DATA_TYPE_NCHAR); + if(kv->valueLen > (*value)->valueLen){ // tags type is nchar + *value = kv; } }else{ taosHashPut(tableMeta->tagHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); } - } } @@ -1649,7 +1223,14 @@ void updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); if(value){ if(kv->type != (*value)->type){ - // todo + buildInvalidDataMsg(msg, "the type is not the same like before", kv->key); + return false; + }else{ + if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger + if(kv->valueLen > (*value)->valueLen){ + *value = kv; + } + } } }else{ taosHashPut(tableMeta->fieldHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); @@ -1687,21 +1268,33 @@ static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } - parseSmlTS(elements.timestamp, cols, info->tsType); - ret = parseSmlCols(elements.cols, elements.colsLen, cols, false); + ret = parseSmlTS(elements.timestamp, elements.timestampLen, cols, info); if(ret != TSDB_CODE_SUCCESS){ return ret; } + ret = parseSmlCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf); + if(ret != TSDB_CODE_SUCCESS){ + return ret; + } + if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){ + buildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL); + return TSDB_CODE_SML_INVALID_DATA; + } TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); if(oneTable){ SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); ASSERT(tableMeta); - updateMeta(*tableMeta, NULL, cols); // update meta - + ret = updateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta + if(!ret){ + return TSDB_CODE_SML_INVALID_DATA; + } taosArrayPush((*oneTable)->cols, &cols); }else{ TAOS_SML_DATA_POINT_TAGS *tag = taosMemoryCalloc(sizeof(TAOS_SML_DATA_POINT_TAGS), 1); + if(!tag){ + return TSDB_CODE_OUT_OF_MEMORY; + } tag->cols = taosArrayInit(16, POINTER_BYTES); if (tag->cols == NULL) { uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); @@ -1709,19 +1302,37 @@ static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) { } taosArrayPush(tag->cols, &cols); + tag->colsColumn = taosArrayInit(16, POINTER_BYTES); + if (tag->cols == NULL) { + uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + tag->tags = taosArrayInit(16, POINTER_BYTES); if (tag->tags == NULL) { uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - ret = parseSmlTags(elements.tags, elements.tagsLen, tag->tags); + ret = parseSmlCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ return ret; } + if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){ + buildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); + return TSDB_CODE_SML_INVALID_DATA; + } + + tag->sTableName = elements.measure; + tag->sTableNameLen = elements.measureLen; + buildSmlChildTableName(tag); + SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); if(tableMeta){ // update meta - updateMeta(*tableMeta, tag->tags, cols); + ret = updateMeta(*tableMeta, tag->tags, cols, &info->msgBuf); + if(!ret){ + return TSDB_CODE_SML_INVALID_DATA; + } }else{ SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); insertMeta(meta, tag->tags, cols); @@ -1771,9 +1382,8 @@ static SSmlLinesInfo* smlBuildInfo(TAOS* taos, SRequestObj* request, SMLProtocol goto cleanup; } info->pRequest = request; - info->msgBuf = info->pRequest->msgBuf; - info->msgLen = ERROR_MSG_BUF_DEFAULT_SIZE; - + info->msgBuf.buf = info->pRequest->msgBuf; + info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); @@ -1868,7 +1478,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT); switch (protocol) { - case TSDB_SML_LINE_PROTOCOL: + case TSDB_SML_LINE_PROTOCOL:{ int32_t tsType = convertPrecisionType(precision); if(tsType == -1){ request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE; @@ -1877,6 +1487,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr code = sml_insert_lines(taos, request, lines, numLines, protocol, tsType); break; + } case TSDB_SML_TELNET_PROTOCOL: //code = taos_insert_telnet_lines(taos, lines, numLines, protocol, tsType, &affected_rows); break; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 0e87d8611e..54e05f9264 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1539,7 +1539,7 @@ static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSche col_id_t lastColIdx = -1; // last column found for (int i = 0; i < taosArrayGetSize(cols); ++i) { SSmlKv *kv = taosArrayGetP(cols, i); - SToken sToken = {.n=kv->keyLen, .z=kv->key}; + SToken sToken = {.n=kv->keyLen, .z=(char*)kv->key}; col_id_t t = lastColIdx + 1; col_id_t index = findCol(&sToken, t, nCols, pSchema); if (index < 0 && t > 0) { @@ -1596,18 +1596,17 @@ static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSche return TSDB_CODE_SUCCESS; } -static int32_t smlParseTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SVCreateTbReq *createTblReq) { +static int32_t smlParseTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDataColInfo* tags, SSchema* pSchema, SVCreateTbReq *createTblReq, SMsgBuf *msg) { if (tdInitKVRowBuilder(tagsBuilder) < 0) { return TSDB_CODE_TSC_OUT_OF_MEMORY; } SKvParam param = {.builder = tagsBuilder}; for (int i = 0; i < tags->numOfBound; ++i) { - SSchema* pTagSchema = &pSchema[tags->boundColumns[i] - 1]; // colId starts with 1 param.schema = pTagSchema; SSmlKv *kv = taosArrayGetP(cols, i); - KvRowAppend(NULL, kv->value, kv->valueLen, ¶m) ; + KvRowAppend(msg, kv->value, kv->valueLen, ¶m) ; } @@ -1630,18 +1629,33 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta SmlExecHandle *smlHandle = (SmlExecHandle *)handle; SSchema* pTagsSchema = getTableTagSchema(pTableMeta); - smlBoundColumns(tags, &smlHandle->tags, pTagsSchema); - smlParseTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &smlHandle->createTblReq); + setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta)); + int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema); + if(ret != TSDB_CODE_SUCCESS){ + buildInvalidOperationMsg(&pBuf, "bound tags error"); + return ret; + } + ret = smlParseTags(tags, &smlHandle->tagsBuilder, &smlHandle->tags, pTagsSchema, &smlHandle->createTblReq, &pBuf); + if(ret != TSDB_CODE_SUCCESS){ + return ret; + } STableDataBlocks* pDataBlock = NULL; - getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, + ret = getDataBlockFromList(smlHandle->pBlockHash, pTableMeta->uid, TSDB_DEFAULT_PAYLOAD_SIZE, sizeof(SSubmitBlk), getTableInfo(pTableMeta).rowSize, pTableMeta, &pDataBlock, NULL, &smlHandle->createTblReq); + if(ret != TSDB_CODE_SUCCESS){ + buildInvalidOperationMsg(&pBuf, "create data block error"); + return ret; + } SSchema* pSchema = getTableColumnSchema(pTableMeta); - smlBoundColumns(taosArrayGetP(cols, 0), &pDataBlock->boundColumnInfo, pSchema); - + ret = smlBoundColumns(taosArrayGetP(cols, 0), &pDataBlock->boundColumnInfo, pSchema); + if(ret != TSDB_CODE_SUCCESS){ + buildInvalidOperationMsg(&pBuf, "bound cols error"); + return ret; + } int32_t extendedRowSize = getExtendedRowSize(pDataBlock); SParsedDataColInfo* spd = &pDataBlock->boundColumnInfo; SRowBuilder* pBuilder = &pDataBlock->rowBuilder; @@ -1649,8 +1663,11 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta initRowBuilder(&pDataBlock->rowBuilder, pDataBlock->pTableMeta->sversion, &pDataBlock->boundColumnInfo); - allocateMemForSize(pDataBlock, extendedRowSize * rowNum); - + ret = allocateMemForSize(pDataBlock, extendedRowSize * rowNum); + if(ret != TSDB_CODE_SUCCESS){ + buildInvalidOperationMsg(&pBuf, "allocate memory error"); + return ret; + } for (int32_t r = 0; r < rowNum; ++r) { STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row);