diff --git a/include/client/taos.h b/include/client/taos.h index 55deee4fad..fa27eb2459 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -199,7 +199,7 @@ DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr); #endif DLL_EXPORT int taos_load_table_info(TAOS *taos, const char *tableNameList); -DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision); +DLL_EXPORT TAOS_RES *taos_schemaless_insert(TAOS *taos, char *lines[], int numLines, int protocol, int precision, bool dataFormat); /* --------------------------TMQ INTERFACE------------------------------- */ diff --git a/include/libs/parser/parser.h b/include/libs/parser/parser.h index 248a6c1237..f608e38226 100644 --- a/include/libs/parser/parser.h +++ b/include/libs/parser/parser.h @@ -93,12 +93,12 @@ int32_t qBindStmtSingleColValue(void *pBlock, TAOS_BIND_v2 *bind, char *msgBuf, int32_t qBuildStmtColFields(void *pDataBlock, int32_t *fieldNum, TAOS_FIELD** fields); int32_t qBuildStmtTagFields(void *pBlock, void *boundTags, int32_t *fieldNum, TAOS_FIELD** fields); int32_t qBindStmtTagsValue(void *pBlock, void *boundTags, int64_t suid, char *tName, TAOS_BIND_v2 *bind, char *msgBuf, int32_t msgBufLen); -void destroyBoundColumnInfo(void* pBoundInfo); +void destroyBoundColumnInfo(void* pBoundInfo); int32_t qCreateSName(SName* pName, const char* pTableName, int32_t acctId, char* dbName, char *msgBuf, int32_t msgBufLen); -void* tscSmlInitHandle(SQuery *pQuery); -void tscSmlDestroyHandle(void *pHandle); -int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen); +void* smlInitHandle(SQuery *pQuery); +void smlDestroyHandle(void *pHandle); +int32_t smlBindData(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen); int32_t smlBuildOutput(void* handle, SHashObj* pVgHash); #ifdef __cplusplus diff --git a/source/client/inc/clientSml.h b/source/client/inc/clientSml.h deleted file mode 100644 index c970f1e954..0000000000 --- a/source/client/inc/clientSml.h +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Copyright (c) 2021 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#ifndef TDENGINE_CLIENTSML_H -#define TDENGINE_CLIENTSML_H - -#ifdef __cplusplus -extern "C" { -#endif - -#include "thash.h" -#include "clientInt.h" -#include "catalog.h" - -typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; - -typedef struct { - const char* measure; - const char* tags; - const char* cols; - const char* timestamp; - - int32_t measureLen; - int32_t measureTagsLen; - int32_t tagsLen; - int32_t colsLen; - int32_t timestampLen; -} TAOS_PARSE_ELEMENTS; - -typedef struct { - const char *sTableName; // super table name - uint8_t sTableNameLen; - char childTableName[TSDB_TABLE_NAME_LEN]; - uint64_t uid; - - SArray *tags; - SArray *cols; // elements are SHashObj for find by key quickly - - SArray colsColumn; // elements are cols key string -} TAOS_SML_DATA_POINT_TAGS; - -typedef struct SSmlSTableMeta { -// char *sTableName; // super table name -// uint8_t sTableNameLen; - uint8_t precision; // the number of precision - SHashObj *tagHash; - SHashObj *fieldHash; -} SSmlSTableMeta; - -typedef struct SMsgBuf { - int32_t len; - char *buf; -} SMsgBuf; - -typedef struct { - uint64_t id; - - SMLProtocolType protocol; - int32_t tsType; - - SHashObj *childTables; - SHashObj *superTables; - - SHashObj *metaHashObj; - SHashObj *pVgHash; - - void *exec; - - STscObj *taos; - SCatalog *pCatalog; - SRequestObj *pRequest; - SQuery *pQuery; - - int32_t affectedRows; - SMsgBuf msgBuf; -} SSmlLinesInfo; - -int smlInsert(TAOS* taos, SSmlLinesInfo* info); - -bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info); -bool isValidInteger(char *str); -bool isValidFloat(char *str); - -int32_t isValidChildTableName(const char *pTbName, int16_t len, SSmlLinesInfo* info); - -bool convertSmlValueType(SSmlKv *pVal, char *value, - uint16_t len, SSmlLinesInfo* info, bool isTag); -int32_t convertSmlTimeStamp(SSmlKv *pVal, char *value, - uint16_t len, SSmlLinesInfo* info); - - -int sml_insert_lines(TAOS* taos, SRequestObj* request, char* lines[], int numLines, SMLProtocolType protocol, - SMLTimeStampType tsType); -int sml_insert_telnet_lines(TAOS* taos, char* lines[], int numLines, SMLProtocolType protocol, - SMLTimeStampType tsType, int* affectedRows); -int sml_insert_json_payload(TAOS* taos, char* payload, SMLProtocolType protocol, - SMLTimeStampType tsType, int* affectedRows); - - -#ifdef __cplusplus -} -#endif - -#endif // TDENGINE_CLIENTSML_H diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 3739c83109..bde9aeed49 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1,29 +1,21 @@ +#include #include #include #include #include -#include "clientSml.h" - -#include "tdef.h" -#include "ttypes.h" -#include "tmsg.h" -#include "tlog.h" #include "query.h" -#include "taoserror.h" #include "taos.h" -#include "ttime.h" +#include "taoserror.h" +#include "tdef.h" +#include "tlog.h" +#include "tmsg.h" #include "tstrbuild.h" - - -typedef struct { - char sTableName[TSDB_TABLE_NAME_LEN]; - SHashObj* tagHash; - SHashObj* fieldHash; - SArray* tags; //SArray - SArray* fields; //SArray - uint8_t precision; -} SSmlSTableSchema; +#include "ttime.h" +#include "ttypes.h" +#include "tcommon.h" +#include "catalog.h" +//================================================================================================= #define SPACE ' ' #define COMMA ',' @@ -32,43 +24,8 @@ typedef struct { #define SLASH '\\' #define tsMaxSQLStringLen (1024*1024) -#define TSNAMELEN 2 -#define TAGNAMELEN 3 //================================================================================================= - -static uint64_t linesSmlHandleId = 0; -static const char* TS = "ts"; -static const char* TAG = "tag"; - - -uint64_t genLinesSmlId() { - uint64_t id; - - do { - id = atomic_add_fetch_64(&linesSmlHandleId, 1); - } while (id == 0); - - return id; -} - -static int32_t buildInvalidDataMsg(SMsgBuf* pBuf, const char *msg1, const char *msg2) { - if(msg1) snprintf(pBuf->buf, pBuf->len, "%s:", msg1); - if(msg2) strncpy(pBuf->buf, msg2, pBuf->len); - return TSDB_CODE_SML_INVALID_DATA; -} - -int compareSmlColKv(const void* p1, const void* p2) { - SSmlKv* kv1 = (SSmlKv *)p1; - SSmlKv* kv2 = (SSmlKv*)p2; - int kvLen1 = (int)strlen(kv1->key); - int kvLen2 = (int)strlen(kv2->key); - int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2)); - if (res != 0) { - return res; - } else { - return kvLen1-kvLen2; - } -} +typedef TSDB_SML_PROTOCOL_TYPE SMLProtocolType; typedef enum { SCHEMA_ACTION_CREATE_STABLE, @@ -97,10 +54,107 @@ typedef struct { }; } SSchemaAction; -static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT_TAGS *tags) { +typedef struct { + const char* measure; + const char* tags; + const char* cols; + const char* timestamp; + + int32_t measureLen; + int32_t measureTagsLen; + int32_t tagsLen; + int32_t colsLen; + int32_t timestampLen; +} SSmlLineInfo; + +typedef struct { + const char *sTableName; // super table name + uint8_t sTableNameLen; + char childTableName[TSDB_TABLE_NAME_LEN]; + uint64_t uid; + + SArray *tags; + + // colsFormat store cols formated, for quick parse, if info->formatData is true + SArray *colsFormat; // elements are SArray + + // cols & colsColumn store cols un formated + SArray *cols; // elements are SHashObj for find by key quickly + SHashObj *columnsHash; // elements are , just for judge if key exists quickly. +} SSmlTableInfo; + +typedef struct { + SHashObj *tagHash; + SHashObj *fieldHash; + STableMeta *tableMeta; +} SSmlSTableMeta; + +typedef struct { + int32_t len; + char *buf; +} SSmlMsgBuf; + +typedef struct { + uint64_t id; + + SMLProtocolType protocol; + int8_t precision; + bool dataFormat; // true means that the name, number and order of keys in each line are the same + + SHashObj *childTables; + SHashObj *superTables; + SHashObj *pVgHash; + void *exec; + + STscObj *taos; + SCatalog *pCatalog; + SRequestObj *pRequest; + SQuery *pQuery; + + int32_t affectedRows; + SSmlMsgBuf msgBuf; +} SSmlHandle; +//================================================================================================= + +static uint64_t linesSmlHandleId = 0; +static const char* TS = "ts"; +static const char* TAG = "tagNone"; + +//================================================================================================= + +static uint64_t smlGenId() { + uint64_t id; + + do { + id = atomic_add_fetch_64(&linesSmlHandleId, 1); + } while (id == 0); + + return id; +} + +static int32_t smlBuildInvalidDataMsg(SSmlMsgBuf* pBuf, const char *msg1, const char *msg2) { + if(msg1) snprintf(pBuf->buf, pBuf->len, "%s:", msg1); + if(msg2) strncpy(pBuf->buf, msg2, pBuf->len); + return TSDB_CODE_SML_INVALID_DATA; +} + +static int smlCompareKv(const void* p1, const void* p2) { + SSmlKv* kv1 = (SSmlKv *)p1; + SSmlKv* kv2 = (SSmlKv*)p2; + int kvLen1 = (int)strlen(kv1->key); + int kvLen2 = (int)strlen(kv2->key); + int res = strncasecmp(kv1->key, kv2->key, MIN(kvLen1, kvLen2)); + if (res != 0) { + return res; + } else { + return kvLen1-kvLen2; + } +} + +static void smlBuildChildTableName(SSmlTableInfo *tags) { int32_t size = taosArrayGetSize(tags->tags); ASSERT(size > 0); - qsort(tags->tags, size, POINTER_BYTES, compareSmlColKv); + qsort(tags->tags, size, POINTER_BYTES, smlCompareKv); SStringBuilder sb = {0}; taosStringBuilderAppendStringLen(&sb, tags->sTableName, tags->sTableNameLen); @@ -120,12 +174,10 @@ static int32_t buildSmlChildTableName(TAOS_SML_DATA_POINT_TAGS *tags) { snprintf(tags->childTableName, TSDB_TABLE_NAME_LEN, "t_%016"PRIx64"%016"PRIx64, digest1, digest2); taosStringBuilderDestroy(&sb); tags->uid = digest1; - uDebug("SML: child table name: %s", tags->childTableName); - return 0; } -static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[], - SSchemaAction* action, bool* actionNeeded, SSmlLinesInfo* info) { +static int32_t smlGenerateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash, SArray* dbAttrArray, bool isTag, char sTableName[], + SSchemaAction* action, bool* actionNeeded, SSmlHandle* info) { // char fieldName[TSDB_COL_NAME_LEN] = {0}; // strcpy(fieldName, pointColField->name); // @@ -168,7 +220,7 @@ static int32_t generateSchemaAction(SSchema* pointColField, SHashObj* dbAttrHash return 0; } -static int32_t buildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) { +static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, int32_t* outBytes) { uint8_t type = field->type; char tname[TSDB_TABLE_NAME_LEN] = {0}; memcpy(tname, field->key, field->keyLen); @@ -185,7 +237,7 @@ static int32_t buildColumnDescription(SSmlKv* field, char* buf, int32_t bufSize, return 0; } -static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInfo* info) { +static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { int32_t code = 0; int32_t outBytes = 0; char *result = (char *)taosMemoryCalloc(1, tsMaxSQLStringLen+1); @@ -195,8 +247,8 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf switch (action->action) { case SCHEMA_ACTION_ADD_COLUMN: { int n = sprintf(result, "alter stable %s add column ", action->alterSTable.sTableName); - buildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); - TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); + TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery code = taos_errno(res); const char* errStr = taos_errstr(res); char* begin = strstr(errStr, "duplicated column names"); @@ -208,7 +260,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf // if (code == TSDB_CODE_MND_FIELD_ALREADY_EXIST || code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { if (code == TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); + TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); code = taos_errno(res2); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); @@ -220,9 +272,9 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf } case SCHEMA_ACTION_ADD_TAG: { int n = sprintf(result, "alter stable %s add tag ", action->alterSTable.sTableName); - buildColumnDescription(action->alterSTable.field, + smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); - TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery code = taos_errno(res); const char* errStr = taos_errstr(res); char* begin = strstr(errStr, "duplicated column names"); @@ -234,7 +286,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf // if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || code == TSDB_CODE_MND_FIELD_ALREAY_EXIST || tscDupColNames) { if (code ==TSDB_CODE_MND_TAG_ALREADY_EXIST || tscDupColNames) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); + TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); code = taos_errno(res2); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); @@ -246,9 +298,9 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf } case SCHEMA_ACTION_CHANGE_COLUMN_SIZE: { int n = sprintf(result, "alter stable %s modify column ", action->alterSTable.sTableName); - buildColumnDescription(action->alterSTable.field, result+n, + smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); - TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); @@ -257,7 +309,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf // if (code == TSDB_CODE_MND_INVALID_COLUMN_LENGTH || code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { if (code == TSDB_CODE_TSC_INVALID_COLUMN_LENGTH) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); + TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); code = taos_errno(res2); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); @@ -269,9 +321,9 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf } case SCHEMA_ACTION_CHANGE_TAG_SIZE: { int n = sprintf(result, "alter stable %s modify tag ", action->alterSTable.sTableName); - buildColumnDescription(action->alterSTable.field, result+n, + smlBuildColumnDescription(action->alterSTable.field, result+n, capacity-n, &outBytes); - TAOS_RES* res = taos_query(taos, result); //TODO async doAsyncQuery + TAOS_RES* res = taos_query(info->taos, result); //TODO async doAsyncQuery code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); @@ -280,7 +332,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf // if (code == TSDB_CODE_MND_INVALID_TAG_LENGTH || code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { if (code == TSDB_CODE_TSC_INVALID_TAG_LENGTH) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); + TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); code = taos_errno(res2); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); @@ -296,7 +348,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf SSmlKv **kv = taosHashIterate(action->createSTable.fields, NULL); while(kv){ - buildColumnDescription(*kv, pos, freeBytes, &outBytes); + smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes); pos += outBytes; freeBytes -= outBytes; *pos = ','; ++pos; --freeBytes; kv = taosHashIterate(action->createSTable.fields, kv); @@ -308,14 +360,14 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf kv = taosHashIterate(action->createSTable.tags, NULL); while(kv){ - buildColumnDescription(*kv, pos, freeBytes, &outBytes); + smlBuildColumnDescription(*kv, pos, freeBytes, &outBytes); pos += outBytes; freeBytes -= outBytes; *pos = ','; ++pos; --freeBytes; kv = taosHashIterate(action->createSTable.tags, kv); } pos--; ++freeBytes; outBytes = snprintf(pos, freeBytes, ")"); - TAOS_RES* res = taos_query(taos, result); + TAOS_RES* res = taos_query(info->taos, result); code = taos_errno(res); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%"PRIx64" apply schema action. error : %s", info->id, taos_errstr(res)); @@ -323,7 +375,7 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf taos_free_result(res); if (code == TSDB_CODE_MND_STB_ALREADY_EXIST) { - TAOS_RES* res2 = taos_query(taos, "RESET QUERY CACHE"); + TAOS_RES* res2 = taos_query(info->taos, "RESET QUERY CACHE"); code = taos_errno(res2); if (code != TSDB_CODE_SUCCESS) { uError("SML:0x%" PRIx64 " apply schema action. reset query cache. error: %s", info->id, taos_errstr(res2)); @@ -338,14 +390,14 @@ static int32_t applySchemaAction(TAOS* taos, SSchemaAction* action, SSmlLinesInf break; } - taosMemoryFree(result); + taosMemoryFreeClear(result); if (code != 0) { uError("SML:0x%"PRIx64 " apply schema action failure. %s", info->id, tstrerror(code)); } return code; } -static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) { +static int32_t smlModifyDBSchemas(SSmlHandle* info) { int32_t code = 0; SSmlSTableMeta** tableMetaSml = taosHashIterate(info->superTables, NULL); @@ -356,7 +408,7 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) { SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); size_t superTableLen = 0; - void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); + void *superTable = taosHashGetKey(tableMetaSml, &superTableLen); // todo escape SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; strcpy(pName.dbname, info->pRequest->pDb); memcpy(pName.tname, superTable, superTableLen); @@ -369,10 +421,15 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) { memcpy(schemaAction.createSTable.sTableName, superTable, superTableLen); schemaAction.createSTable.tags = cTablePoints->tagHash; schemaAction.createSTable.fields = cTablePoints->fieldHash; - applySchemaAction(taos, &schemaAction, info); + code = smlApplySchemaAction(info, &schemaAction); + if (code != 0) { + uError("SML:0x%"PRIx64" smlApplySchemaAction failed. can not create %s", info->id, schemaAction.createSTable.sTableName); + return code; + } + code = catalogGetSTableMeta(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &pTableMeta); if (code != 0) { - uError("SML:0x%"PRIx64" reconcile point schema failed. can not create %s", info->id, schemaAction.createSTable.sTableName); + uError("SML:0x%"PRIx64" catalogGetSTableMeta failed. super table name %s", info->id, schemaAction.createSTable.sTableName); return code; } }else if (code == TSDB_CODE_SUCCESS) { @@ -380,67 +437,13 @@ static int32_t modifyDBSchemas(TAOS* taos, SSmlLinesInfo* info) { uError("SML:0x%"PRIx64" load table meta error: %s", info->id, tstrerror(code)); return code; } - taosHashPut(info->metaHashObj, superTable, superTableLen, &pTableMeta, POINTER_BYTES); + cTablePoints->tableMeta = pTableMeta; tableMetaSml = taosHashIterate(info->superTables, tableMetaSml); } return 0; } -static int32_t applyDataPoints(SSmlLinesInfo* info) { - int32_t code = TSDB_CODE_SUCCESS; - - TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashIterate(info->childTables, NULL); - while (oneTable) { - TAOS_SML_DATA_POINT_TAGS* tableData = *oneTable; - - SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; - strcpy(pName.dbname, info->pRequest->pDb); - memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName)); - SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); - SVgroupInfo vg; - catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg); - taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); - - STableMeta** pMeta = taosHashGet(info->metaHashObj, tableData->sTableName, tableData->sTableNameLen); - ASSERT (NULL != pMeta && NULL != *pMeta); - (*pMeta)->vgId = vg.vgId; - (*pMeta)->uid = tableData->uid; // one table merge data block together according uid - - code = smlBind(info->exec, tableData->tags, tableData->cols, *pMeta, info->msgBuf.buf, info->msgBuf.len); - if(code != TSDB_CODE_SUCCESS){ - return code; - } - oneTable = taosHashIterate(info->childTables, oneTable); - } - - smlBuildOutput(info->exec, info->pVgHash); - launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true); - - info->affectedRows = taos_affected_rows(info->pRequest); - return info->pRequest->code; -} - -int smlInsert(TAOS* taos, SSmlLinesInfo* info) { - uDebug("SML:0x%"PRIx64" taos_sml_insert. number of super tables: %d", info->id, taosHashGetSize(info->superTables)); - - uDebug("SML:0x%"PRIx64" modify db schemas", info->id); - int32_t code = modifyDBSchemas(taos, info); - if (code != 0) { - uError("SML:0x%"PRIx64" error change db schema : %s", info->id, tstrerror(code)); - return code; - } - - uDebug("SML:0x%"PRIx64" apply data points", info->id); - code = applyDataPoints(info); - if (code != 0) { - uError("SML:0x%"PRIx64" error apply data points : %s", info->id, tstrerror(code)); - return code; - } - - return TSDB_CODE_SUCCESS; -} - //========================================================================= /* Field Escape charaters @@ -448,50 +451,50 @@ int smlInsert(TAOS* taos, SSmlLinesInfo* info) { 2: tag_key, tag_value, field_key Comma,Equal Sign,Space 3: field_value Double quote,Backslash */ -static void escapeSpecialCharacter(uint8_t field, const char **pos) { - const char *cur = *pos; - if (*cur != '\\') { - return; - } - switch (field) { - case 1: - switch (*(cur + 1)) { - case ',': - case ' ': - cur++; - break; - default: - break; - } - break; - case 2: - switch (*(cur + 1)) { - case ',': - case ' ': - case '=': - cur++; - break; - default: - break; - } - break; - case 3: - switch (*(cur + 1)) { - case '"': - case '\\': - cur++; - break; - default: - break; - } - break; - default: - break; - } - *pos = cur; -} +//static void escapeSpecialCharacter(uint8_t field, const char **pos) { +// const char *cur = *pos; +// if (*cur != '\\') { +// return; +// } +// switch (field) { +// case 1: +// switch (*(cur + 1)) { +// case ',': +// case ' ': +// cur++; +// break; +// default: +// break; +// } +// break; +// case 2: +// switch (*(cur + 1)) { +// case ',': +// case ' ': +// case '=': +// cur++; +// break; +// default: +// break; +// } +// break; +// case 3: +// switch (*(cur + 1)) { +// case '"': +// case '\\': +// cur++; +// break; +// default: +// break; +// } +// break; +// default: +// break; +// } +// *pos = cur; +//} -static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseTinyInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 2) { @@ -503,10 +506,10 @@ static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { int64_t result = strtoll(pVal, &endptr, 10); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid tiny int", endptr); + smlBuildInvalidDataMsg(msg, "invalid tiny int", endptr); }else if(!IS_VALID_TINYINT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr); + smlBuildInvalidDataMsg(msg, "tiny int out of range[-128,127]", endptr); }else{ kvVal->i = result; *isValid = true; @@ -516,7 +519,7 @@ static bool parseTinyInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseTinyUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 2) { @@ -531,10 +534,10 @@ static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { int64_t result = strtoll(pVal, &endptr, 10); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr); + smlBuildInvalidDataMsg(msg, "invalid unsigned tiny int", endptr); }else if(!IS_VALID_UTINYINT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr); + smlBuildInvalidDataMsg(msg, "unsigned tiny int out of range[0,255]", endptr); }else{ kvVal->i = result; *isValid = true; @@ -544,7 +547,7 @@ static bool parseTinyUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseSmallInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 3) { @@ -556,10 +559,10 @@ static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { int64_t result = strtoll(pVal, &endptr, 10); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid small int", endptr); + smlBuildInvalidDataMsg(msg, "invalid small int", endptr); }else if(!IS_VALID_SMALLINT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr); + smlBuildInvalidDataMsg(msg, "small int our of range[-32768,32767]", endptr); }else{ kvVal->i = result; *isValid = true; @@ -569,7 +572,7 @@ static bool parseSmallInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseSmallUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 3) { @@ -584,10 +587,10 @@ static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { int64_t result = strtoll(pVal, &endptr, 10); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid unsigned small int", endptr); + smlBuildInvalidDataMsg(msg, "invalid unsigned small int", endptr); }else if(!IS_VALID_USMALLINT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr); + smlBuildInvalidDataMsg(msg, "unsigned small int out of rang[0,65535]", endptr); }else{ kvVal->i = result; *isValid = true; @@ -597,7 +600,7 @@ static bool parseSmallUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 3) { @@ -609,10 +612,10 @@ static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { int64_t result = strtoll(pVal, &endptr, 10); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid int", endptr); + smlBuildInvalidDataMsg(msg, "invalid int", endptr); }else if(!IS_VALID_INT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr); + smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", endptr); }else{ kvVal->i = result; *isValid = true; @@ -622,7 +625,7 @@ static bool parseInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 3) { @@ -637,10 +640,10 @@ static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { int64_t result = strtoll(pVal, &endptr, 10); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid unsigned int", endptr); + smlBuildInvalidDataMsg(msg, "invalid unsigned int", endptr); }else if(!IS_VALID_UINT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr); + smlBuildInvalidDataMsg(msg, "unsigned int out of range[0,4294967295]", endptr); }else{ kvVal->i = result; *isValid = true; @@ -650,7 +653,7 @@ static bool parseUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseBigInt(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len > 3 && strcasecmp(pVal + len - 3, "i64") == 0) { @@ -670,10 +673,10 @@ static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { int64_t result = strtoll(pVal, &endptr, 10); if(endptr != pVal + len - 1){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid big int", endptr); + smlBuildInvalidDataMsg(msg, "invalid big int", endptr); }else if(!IS_VALID_BIGINT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr); + smlBuildInvalidDataMsg(msg, "big int out of range[-9223372036854775808,9223372036854775807]", endptr); }else{ kvVal->i = result; *isValid = true; @@ -683,7 +686,7 @@ static bool parseBigInt(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseBigUint(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 3) { @@ -698,10 +701,10 @@ static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { uint64_t result = strtoull(pVal, &endptr, 10); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid unsigned big int", endptr); + smlBuildInvalidDataMsg(msg, "invalid unsigned big int", endptr); }else if(!IS_VALID_UBIGINT(result)){ *isValid = false; - buildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr); + smlBuildInvalidDataMsg(msg, "unsigned big int out of range[0,18446744073709551615]", endptr); }else{ kvVal->u = result; *isValid = true; @@ -711,7 +714,7 @@ static bool parseBigUint(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseFloat(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; char *endptr = NULL; @@ -725,10 +728,10 @@ static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { if (len > 3 && len f = result; *isValid = true; @@ -738,7 +741,7 @@ static bool parseFloat(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { +static bool smlParseDouble(SSmlKv *kvVal, bool *isValid, SSmlMsgBuf *msg) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if (len <= 3) { @@ -750,10 +753,10 @@ static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { double result = strtod(pVal, &endptr); if(endptr != signalPos){ // 78ri8 *isValid = false; - buildInvalidDataMsg(msg, "invalid double", endptr); + smlBuildInvalidDataMsg(msg, "invalid double", endptr); }else if(!IS_VALID_DOUBLE(result)){ *isValid = false; - buildInvalidDataMsg(msg, "double out of range[-1.7976931348623158e+308,1.7976931348623158e+308]", endptr); + smlBuildInvalidDataMsg(msg, "double out of range[-1.7976931348623158e+308,1.7976931348623158e+308]", endptr); }else{ kvVal->d = result; *isValid = true; @@ -763,70 +766,63 @@ static bool parseDouble(SSmlKv *kvVal, bool *isValid, SMsgBuf *msg) { return false; } -static bool parseBool(SSmlKv *kvVal) { +static bool smlParseBool(SSmlKv *kvVal) { const char *pVal = kvVal->value; int32_t len = kvVal->valueLen; if ((len == 1) && pVal[len - 1] == 't') { - //printf("Type is bool(%c)\n", pVal[len - 1]); kvVal->i = true; return true; } if ((len == 1) && pVal[len - 1] == 'f') { - //printf("Type is bool(%c)\n", pVal[len - 1]); kvVal->i = false; return true; } if((len == 4) && !strcasecmp(pVal, "true")) { - //printf("Type is bool(%s)\n", &pVal[len - 4]); kvVal->i = true; return true; } if((len == 5) && !strcasecmp(pVal, "false")) { - //printf("Type is bool(%s)\n", &pVal[len - 5]); kvVal->i = false; return true; } return false; } -static bool isBinary(const char *pVal, uint16_t len) { +static bool smlIsBinary(const char *pVal, uint16_t len) { //binary: "abc" if (len < 2) { return false; } - //binary if (pVal[0] == '"' && pVal[len - 1] == '"') { - //printf("Type is binary(%s)\n", pVal); return true; } return false; } -static bool isNchar(const char *pVal, uint16_t len) { +static bool smlIsNchar(const char *pVal, uint16_t len) { //nchar: L"abc" if (len < 3) { return false; } if ((pVal[0] == 'l' || pVal[0] == 'L')&& pVal[1] == '"' && pVal[len - 1] == '"') { - //printf("Type is nchar(%s)\n", pVal); return true; } return false; } -static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) { +static bool smlParseValue(SSmlKv *pVal, SSmlMsgBuf *msg) { // put high probability matching type first bool isValid = false; - if (parseFloat(pVal, &isValid, msg)) { + if (smlParseFloat(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_FLOAT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } //binary - if (isBinary(pVal->value, pVal->valueLen)) { + if (smlIsBinary(pVal->value, pVal->valueLen)) { pVal->type = TSDB_DATA_TYPE_BINARY; pVal->length = pVal->valueLen - 2; pVal->valueLen -= 2; @@ -834,13 +830,13 @@ static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) { return true; } //nchar - if (isNchar(pVal->value, pVal->valueLen)) { + if (smlIsNchar(pVal->value, pVal->valueLen)) { pVal->type = TSDB_DATA_TYPE_NCHAR; pVal->length = pVal->valueLen - 3; pVal->value = pVal->value+2; return true; } - if (parseDouble(pVal, &isValid, msg)) { + if (smlParseDouble(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_DOUBLE; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; @@ -848,66 +844,66 @@ static bool convertSmlValue(SSmlKv *pVal, SMsgBuf *msg) { return true; } //bool - if (parseBool(pVal)) { + if (smlParseBool(pVal)) { pVal->type = TSDB_DATA_TYPE_BOOL; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseTinyInt(pVal, &isValid, msg)) { + if (smlParseTinyInt(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_TINYINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseTinyUint(pVal, &isValid, msg)) { + if (smlParseTinyUint(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_UTINYINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseSmallInt(pVal, &isValid, msg)) { + if (smlParseSmallInt(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_SMALLINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseSmallUint(pVal, &isValid, msg)) { + if (smlParseSmallUint(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_USMALLINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseInt(pVal, &isValid, msg)) { + if (smlParseInt(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_INT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseUint(pVal, &isValid, msg)) { + if (smlParseUint(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_UINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseBigInt(pVal, &isValid, msg)) { + if (smlParseBigInt(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_BIGINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - if (parseBigUint(pVal, &isValid, msg)) { + if (smlParseBigUint(pVal, &isValid, msg)) { if(!isValid) return false; pVal->type = TSDB_DATA_TYPE_UBIGINT; pVal->length = (int16_t)tDataTypes[pVal->type].bytes; return true; } - buildInvalidDataMsg(msg, "invalid data", pVal->value); + smlBuildInvalidDataMsg(msg, "invalid data", pVal->value); return false; } -bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) { +static bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlHandle* info) { char *val = NULL; val = taosHashGet(pHash, key, strlen(key)); if (val) { @@ -921,7 +917,7 @@ bool checkDuplicateKey(char *key, SHashObj *pHash, SSmlLinesInfo* info) { return false; } -int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ +static int32_t smlParseString(const char* sql, SSmlLineInfo *elements, SSmlMsgBuf *msg){ if(!sql) return TSDB_CODE_SML_INVALID_DATA; while (*sql != '\0') { // jump the space at the begining if(*sql != SPACE) { @@ -930,7 +926,10 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ } sql++; } - if (!elements->measure || *sql == COMMA) return TSDB_CODE_SML_INVALID_DATA; + if (!elements->measure || *sql == COMMA) { + smlBuildInvalidDataMsg(msg, "invalid data", sql); + return TSDB_CODE_SML_INVALID_DATA; + } // parse measure and tag while (*sql != '\0') { @@ -953,7 +952,13 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ sql++; } - if(elements->measureLen == 0) return TSDB_CODE_SML_INVALID_DATA; + if(elements->tagsLen == 0){ // measure, cols1=a measure cols1=a + elements->measureTagsLen = elements->measureLen; + } + if(elements->measureLen == 0) { + smlBuildInvalidDataMsg(msg, "invalid measure", elements->measure); + return TSDB_CODE_SML_INVALID_DATA; + } // parse cols while (*sql != '\0') { @@ -963,7 +968,10 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ } sql++; } - if(!elements->cols) return TSDB_CODE_SML_INVALID_DATA; + if(!elements->cols) { + smlBuildInvalidDataMsg(msg, "invalid columns", elements->cols); + return TSDB_CODE_SML_INVALID_DATA; + } bool isInQuote = false; while (*sql != '\0') { @@ -992,13 +1000,13 @@ int32_t parseSml(const char* sql, TAOS_PARSE_ELEMENTS *elements){ return TSDB_CODE_SUCCESS; } -bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgBuf *msg){ +static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SSmlMsgBuf *msg){ if(isTag && len == 0){ SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); kv->key = TAG; - kv->keyLen = TAGNAMELEN; + kv->keyLen = strlen(TAG); kv->value = TAG; - kv->valueLen = TAGNAMELEN; + kv->valueLen = strlen(TAG); kv->type = TSDB_DATA_TYPE_NCHAR; if(cols) taosArrayPush(cols, &kv); return true; @@ -1016,7 +1024,7 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB i++; } if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ - buildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); + smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); return TSDB_CODE_SML_INVALID_DATA; } @@ -1031,7 +1039,7 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB } int32_t valueLen = data + i - value; if(valueLen == 0){ - buildInvalidDataMsg(msg, "invalid value", value); + smlBuildInvalidDataMsg(msg, "invalid value", value); return TSDB_CODE_SML_INVALID_DATA; } @@ -1044,7 +1052,7 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB if(isTag){ kv->type = TSDB_DATA_TYPE_NCHAR; }else{ - if(!convertSmlValue(kv, msg)){ + if(!smlParseValue(kv, msg)){ return TSDB_CODE_SML_INVALID_DATA; } } @@ -1055,53 +1063,56 @@ bool parseSmlCols(const char* data, int32_t len, SArray *cols, bool isTag, SMsgB return TSDB_CODE_SUCCESS; } -static int64_t getTimeStampValue(const char *value, int32_t type) { - double ts = (double)strtoll(value, NULL, 10); +static int64_t smlGetTimeValue(const char *value, int32_t len, int8_t type) { + char *endPtr = NULL; + double ts = (double)strtoll(value, &endPtr, 10); + if(value + len != endPtr){ + return -1; + } switch (type) { case TSDB_TIME_PRECISION_HOURS: ts *= (3600 * 1e9); + break; case TSDB_TIME_PRECISION_MINUTES: ts *= (60 * 1e9); + break; case TSDB_TIME_PRECISION_SECONDS: ts *= (1e9); + break; case TSDB_TIME_PRECISION_MICRO: ts *= (1e6); + break; case TSDB_TIME_PRECISION_MILLI: ts *= (1e3); - default: break; + default: + ASSERT(0); } if(ts > (double)INT64_MAX || ts < 0){ return -1; - }else{ - return (int64_t)ts; } + + return (int64_t)ts; } -static int64_t getTimeStampNow(int32_t precision) { +static int64_t smlGetTimeNow(int8_t precision) { switch (precision) { case TSDB_TIME_PRECISION_HOURS: return taosGetTimestampMs()/1000/3600; case TSDB_TIME_PRECISION_MINUTES: return taosGetTimestampMs()/1000/60; - case TSDB_TIME_PRECISION_SECONDS: return taosGetTimestampMs()/1000; - default: + case TSDB_TIME_PRECISION_MILLI: + case TSDB_TIME_PRECISION_MICRO: + case TSDB_TIME_PRECISION_NANO: return taosGetTimestamp(precision); + default: + ASSERT(0); } } -static bool isValidateTimeStamp(const char *pVal, int32_t len) { - for (int i = 0; i < len; ++i) { - if (!isdigit(pVal[i])) { - return false; - } - } - return true; -} - -static int32_t getTsType(int32_t len) { +static int8_t smlGetTsTypeByLen(int32_t len) { if (len == TSDB_TIME_PRECISION_SEC_DIGITS) { return TSDB_TIME_PRECISION_SECONDS; } else if (len == TSDB_TIME_PRECISION_MILLI_DIGITS) { @@ -1111,52 +1122,87 @@ static int32_t getTsType(int32_t len) { } } -static int32_t parseSmlTS(const char* data, int32_t len, SArray *tags, SSmlLinesInfo* info){ - int64_t ts = 0; - if(data == NULL){ - if(info->protocol != TSDB_SML_LINE_PROTOCOL){ - buildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } - ts = getTimeStampNow(info->tsType); - }else{ - int ret = isValidateTimeStamp(data, len); - if(!ret){ - buildInvalidDataMsg(&info->msgBuf, "timestamp must be digit", data); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } - int32_t tsType = -1; - if(info->protocol != TSDB_SML_LINE_PROTOCOL){ - tsType = getTsType(len); - if (tsType == -1) { - buildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } - }else{ - tsType = info->tsType; - } - ts = getTimeStampValue(data, tsType); - if(ts == -1){ - buildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); - return TSDB_CODE_TSC_INVALID_TIME_STAMP; - } +static int8_t smlGetTsTypeByPrecision(int8_t precision) { + switch (precision) { + case TSDB_SML_TIMESTAMP_HOURS: + return TSDB_TIME_PRECISION_HOURS; + case TSDB_SML_TIMESTAMP_MILLI_SECONDS: + return TSDB_TIME_PRECISION_MILLI; + case TSDB_SML_TIMESTAMP_NANO_SECONDS: + case TSDB_SML_TIMESTAMP_NOT_CONFIGURED: + return TSDB_TIME_PRECISION_NANO; + case TSDB_SML_TIMESTAMP_MICRO_SECONDS: + return TSDB_TIME_PRECISION_MICRO; + case TSDB_SML_TIMESTAMP_SECONDS: + return TSDB_TIME_PRECISION_SECONDS; + case TSDB_SML_TIMESTAMP_MINUTES: + return TSDB_TIME_PRECISION_MINUTES; + default: + return -1; + } +} + +static int64_t smlParseInfluxTime(SSmlHandle* info, const char* data, int32_t len){ + int8_t tsType = smlGetTsTypeByPrecision(info->precision); + if (tsType == -1) { + smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp precision", NULL); + return -1; + } + if(!data){ + return smlGetTimeNow(tsType); } + int64_t ts = smlGetTimeValue(data, len, tsType); + if(ts == -1){ + smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); + return -1; + } + return ts; +} + +static int64_t smlParseOpenTsdbTime(SSmlHandle* info, const char* data, int32_t len){ + if(!data){ + smlBuildInvalidDataMsg(&info->msgBuf, "timestamp can not be null", NULL); + return -1; + } + int8_t tsType = smlGetTsTypeByLen(len); + if (tsType == -1) { + smlBuildInvalidDataMsg(&info->msgBuf, "timestamp precision can only be seconds(10 digits) or milli seconds(13 digits)", data); + return -1; + } + int64_t ts = smlGetTimeValue(data, len, tsType); + if(ts == -1){ + smlBuildInvalidDataMsg(&info->msgBuf, "invalid timestamp", data); + return -1; + } + return ts; +} + +static int32_t smlParseTS(SSmlHandle* info, const char* data, int32_t len, SArray *cols){ + int64_t ts = 0; + if(info->protocol == TSDB_SML_LINE_PROTOCOL){ + ts = smlParseInfluxTime(info, data, len); + }else{ + ts = smlParseOpenTsdbTime(info, data, len); + } + if(ts == -1) return TSDB_CODE_TSC_INVALID_TIME_STAMP; + + // add ts to SSmlKv *kv = taosMemoryCalloc(sizeof(SSmlKv), 1); if(!kv){ return TSDB_CODE_OUT_OF_MEMORY; } kv->key = TS; - kv->keyLen = TSNAMELEN; + kv->keyLen = strlen(kv->key); kv->i = ts; kv->type = TSDB_DATA_TYPE_TIMESTAMP; kv->length = (int16_t)tDataTypes[kv->type].bytes; - if(tags) taosArrayPush(tags, &kv); + if(cols) taosArrayPush(cols, &kv); return TSDB_CODE_SUCCESS; } -//int32_t parseSmlCols(const char* data, SArray *cols){ +//static int32_t parseSmlCols(const char* data, SArray *cols){ // while(*data != '\0'){ // if(*data == EQUAL) return TSDB_CODE_SML_INVALID_DATA; // const char *key = data; @@ -1199,7 +1245,7 @@ static int32_t parseSmlTS(const char* data, int32_t len, SArray *tags, SSmlLines // return TSDB_CODE_SUCCESS; //} -bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf *msg){ +static bool smlUpdateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SSmlMsgBuf *msg){ if(tags){ for (int i = 0; i < taosArrayGetSize(tags); ++i) { SSmlKv *kv = taosArrayGetP(tags, i); @@ -1223,7 +1269,7 @@ bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf * SSmlKv **value = taosHashGet(tableMeta->fieldHash, kv->key, kv->keyLen); if(value){ if(kv->type != (*value)->type){ - buildInvalidDataMsg(msg, "the type is not the same like before", kv->key); + smlBuildInvalidDataMsg(msg, "the type is not the same like before", kv->key); return false; }else{ if(IS_VAR_DATA_TYPE(kv->type)){ // update string len, if bigger @@ -1237,9 +1283,10 @@ bool updateMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols, SMsgBuf * } } } + return true; } -void insertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ +static void smlInsertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ if(tags){ for (int i = 0; i < taosArrayGetSize(tags); ++i) { SSmlKv *kv = taosArrayGetP(tags, i); @@ -1255,87 +1302,194 @@ void insertMeta(SSmlSTableMeta* tableMeta, SArray *tags, SArray *cols){ } } -static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) { - TAOS_PARSE_ELEMENTS elements = {0}; - int ret = parseSml(sql, &elements); +static SSmlTableInfo* smlBuildTableInfo(bool format){ + SSmlTableInfo *tag = taosMemoryCalloc(sizeof(SSmlTableInfo), 1); + if(!tag){ + return NULL; + } + + if(format){ + tag->colsFormat = taosArrayInit(16, POINTER_BYTES); + if (tag->colsFormat == NULL) { + uError("SML:smlParseLine failed to allocate memory"); + goto cleanup; + } + }else{ + tag->cols = taosArrayInit(16, POINTER_BYTES); + if (tag->cols == NULL) { + uError("SML:smlParseLine failed to allocate memory"); + goto cleanup; + } + + tag->columnsHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); + if (tag->columnsHash == NULL) { + uError("SML:smlParseLine failed to allocate memory"); + goto cleanup; + } + } + + tag->tags = taosArrayInit(16, POINTER_BYTES); + if (tag->tags == NULL) { + uError("SML:smlParseLine failed to allocate memory"); + goto cleanup; + } + return tag; + +cleanup: + taosMemoryFreeClear(tag); + return NULL; +} + +static void smlDestroyBuildTableInfo(SSmlTableInfo *tag, bool format){ + if(format){ + taosArrayDestroy(tag->colsFormat); + }else{ + tag->cols = taosArrayInit(16, POINTER_BYTES); + for(size_t i = 0; i < taosArrayGetSize(tag->cols); i++){ + SHashObj *kvHash = taosArrayGetP(tag->cols, i); + void** p1 = taosHashIterate(kvHash, NULL); + while (p1) { + SSmlKv* kv = *p1; + taosMemoryFreeClear(kv); + p1 = taosHashIterate(kvHash, p1); + } + taosHashCleanup(kvHash); + } + taosHashCleanup(tag->columnsHash); + } + taosArrayDestroy(tag->tags); + taosMemoryFreeClear(tag); +} + +static int32_t smlDealCols(SSmlTableInfo* oneTable, bool dataFormat, SArray *cols){ + if(dataFormat){ + taosArrayPush(oneTable->colsFormat, &cols); + }else{ + SHashObj *kvHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); + if(!kvHash){ + uError("SML:smlDealCols failed to allocate memory"); + return TSDB_CODE_TSC_OUT_OF_MEMORY; + } + for(size_t i = 0; i < taosArrayGetSize(cols); i++){ + SSmlKv *kv = taosArrayGetP(cols, i); + taosHashPut(kvHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); // todo key need escape, like \=, because find by schema name later + + if(taosHashGet(oneTable->columnsHash, kv->key, kv->keyLen) != NULL){ + continue; + } + taosHashPut(oneTable->columnsHash, kv->key, kv->keyLen, &kv, POINTER_BYTES); + } + taosArrayPush(oneTable->cols, &kvHash); + } +} + +static SSmlSTableMeta* smlBuildSTableMeta(){ + SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); + if(!meta){ + return NULL; + } + meta->tagHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); + if (meta->tagHash == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } + + meta->fieldHash = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); + if (meta->fieldHash == NULL) { + uError("SML:smlBuildSTableMeta failed to allocate memory"); + goto cleanup; + } + return meta; + +cleanup: + taosMemoryFreeClear(meta); + return NULL; +} + +static void smlDestroySTableMeta(SSmlSTableMeta *meta){ + taosHashCleanup(meta->tagHash); + taosHashCleanup(meta->fieldHash); + taosMemoryFree(meta->tableMeta); +} + +static int32_t smlParseLine(SSmlHandle* info, const char* sql) { + SSmlLineInfo elements = {0}; + int ret = smlParseString(sql, &elements, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ + uError("SML:0x%"PRIx64" smlParseString failed", info->id); return ret; } SArray *cols = taosArrayInit(16, POINTER_BYTES); if (cols == NULL) { - uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); + uError("SML:0x%"PRIx64" smlParseLine failed to allocate memory", info->id); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - ret = parseSmlTS(elements.timestamp, elements.timestampLen, cols, info); + ret = smlParseTS(info, elements.timestamp, elements.timestampLen, cols); if(ret != TSDB_CODE_SUCCESS){ + uError("SML:0x%"PRIx64" smlParseTS failed", info->id); return ret; } - ret = parseSmlCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf); + ret = smlParseCols(elements.cols, elements.colsLen, cols, false, &info->msgBuf); if(ret != TSDB_CODE_SUCCESS){ + uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id); return ret; } if(taosArrayGetSize(cols) > TSDB_MAX_COLUMNS){ - buildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL); + smlBuildInvalidDataMsg(&info->msgBuf, "too many columns than 4096", NULL); return TSDB_CODE_SML_INVALID_DATA; } - TAOS_SML_DATA_POINT_TAGS** oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); + SSmlTableInfo **oneTable = taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); if(oneTable){ SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); ASSERT(tableMeta); - ret = updateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta + ret = smlUpdateMeta(*tableMeta, NULL, cols, &info->msgBuf); // update meta cols if(!ret){ + uError("SML:0x%"PRIx64" smlUpdateMeta cols failed", info->id); return TSDB_CODE_SML_INVALID_DATA; } - taosArrayPush((*oneTable)->cols, &cols); + ret = smlDealCols(*oneTable, info->dataFormat, cols); + if(ret != TSDB_CODE_SUCCESS){ + return ret; + } }else{ - TAOS_SML_DATA_POINT_TAGS *tag = taosMemoryCalloc(sizeof(TAOS_SML_DATA_POINT_TAGS), 1); + SSmlTableInfo *tag = smlBuildTableInfo(info->dataFormat); if(!tag){ - return TSDB_CODE_OUT_OF_MEMORY; - } - tag->cols = taosArrayInit(16, POINTER_BYTES); - if (tag->cols == NULL) { - uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); return TSDB_CODE_TSC_OUT_OF_MEMORY; } - taosArrayPush(tag->cols, &cols); - - tag->colsColumn = taosArrayInit(16, POINTER_BYTES); - if (tag->cols == NULL) { - uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - - tag->tags = taosArrayInit(16, POINTER_BYTES); - if (tag->tags == NULL) { - uError("SML:0x%"PRIx64" taos_insert_lines failed to allocate memory", info->id); - return TSDB_CODE_TSC_OUT_OF_MEMORY; - } - ret = parseSmlCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf); + ret = smlDealCols(tag, info->dataFormat, cols); if(ret != TSDB_CODE_SUCCESS){ return ret; } + ret = smlParseCols(elements.tags, elements.tagsLen, tag->tags, true, &info->msgBuf); + if(ret != TSDB_CODE_SUCCESS){ + uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); + return ret; + } + if(taosArrayGetSize(tag->tags) > TSDB_MAX_TAGS){ - buildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); + smlBuildInvalidDataMsg(&info->msgBuf, "too many tags than 128", NULL); return TSDB_CODE_SML_INVALID_DATA; } tag->sTableName = elements.measure; tag->sTableNameLen = elements.measureLen; - buildSmlChildTableName(tag); + smlBuildChildTableName(tag); + uDebug("SML:0x%"PRIx64" child table name: %s", info->id, tag->childTableName); SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, elements.measure, elements.measureLen); if(tableMeta){ // update meta - ret = updateMeta(*tableMeta, tag->tags, cols, &info->msgBuf); + ret = smlUpdateMeta(*tableMeta, tag->tags, cols, &info->msgBuf); if(!ret){ + uError("SML:0x%"PRIx64" smlUpdateMeta failed", info->id); return TSDB_CODE_SML_INVALID_DATA; } }else{ - SSmlSTableMeta* meta = taosMemoryCalloc(sizeof(SSmlSTableMeta), 1); - insertMeta(meta, tag->tags, cols); + SSmlSTableMeta *meta = smlBuildSTableMeta(); + smlInsertMeta(meta, tag->tags, cols); taosHashPut(info->superTables, elements.measure, elements.measureLen, &meta, POINTER_BYTES); } @@ -1344,114 +1498,165 @@ static int32_t smlParseLine(const char* sql, SSmlLinesInfo* info) { return TSDB_CODE_SUCCESS; } -static void smlDestroyInfo(SSmlLinesInfo* info){ +static void smlDestroyInfo(SSmlHandle* info){ if(!info) return; qDestroyQuery(info->pQuery); - tscSmlDestroyHandle(info->exec); + smlDestroyHandle(info->exec); + + // destroy info->childTables + void** p1 = taosHashIterate(info->childTables, NULL); + while (p1) { + SSmlTableInfo* oneTable = *p1; + smlDestroyBuildTableInfo(oneTable, info->dataFormat); + p1 = taosHashIterate(info->childTables, p1); + } taosHashCleanup(info->childTables); + + // destroy info->superTables + p1 = taosHashIterate(info->superTables, NULL); + while (p1) { + SSmlSTableMeta* oneTable = *p1; + smlDestroySTableMeta(oneTable); + p1 = taosHashIterate(info->superTables, p1); + } taosHashCleanup(info->superTables); - taosHashCleanup(info->metaHashObj); + + // destroy info->pVgHash taosHashCleanup(info->pVgHash); - taosMemoryFree(info); + + taosMemoryFreeClear(info); } -static SSmlLinesInfo* smlBuildInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int32_t tsType){ - SSmlLinesInfo* info = taosMemoryMalloc(sizeof(SSmlLinesInfo)); + +static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){ + SSmlHandle* info = taosMemoryMalloc(sizeof(SSmlHandle)); if (NULL == info) { return NULL; } - info->id = genLinesSmlId(); - info->tsType = tsType; - info->taos = taos; - info->protocol = protocol; + info->id = smlGenId(); - info->pQuery = taosMemoryCalloc(1, sizeof(SQuery)); + info->pQuery = taosMemoryCalloc(1, sizeof(SQuery)); if (NULL == info->pQuery) { + uError("SML:0x%"PRIx64" create info->pQuery error", info->id); goto cleanup; } - info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + info->pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; info->pQuery->haveResultSet = false; - info->pQuery->msgType = TDMT_VND_SUBMIT; - info->pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + info->pQuery->msgType = TDMT_VND_SUBMIT; + info->pQuery->pRoot = (SNode*)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if(NULL == info->pQuery->pRoot){ + uError("SML:0x%"PRIx64" create info->pQuery->pRoot error", info->id); + goto cleanup; + } ((SVnodeModifOpStmt*)(info->pQuery->pRoot))->payloadType = PAYLOAD_TYPE_KV; - info->exec = tscSmlInitHandle(info->pQuery); - - int32_t code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); + info->taos = taos; + int32_t code = catalogGetHandle(info->taos->pAppInfo->clusterId, &info->pCatalog); if(code != TSDB_CODE_SUCCESS){ uError("SML:0x%"PRIx64" get catalog error %d", info->id, code); goto cleanup; } - info->pRequest = request; - info->msgBuf.buf = info->pRequest->msgBuf; - info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; + info->precision = precision; + info->protocol = protocol; + info->dataFormat = dataFormat; + info->pRequest = request; + info->msgBuf.buf = info->pRequest->msgBuf; + info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; + + info->exec = smlInitHandle(info->pQuery); info->childTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); info->superTables = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, false); - info->metaHashObj = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR), true, false); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, false); - return info; + if(NULL == info->exec || NULL == info->childTables + || NULL == info->superTables || NULL == info->pVgHash){ + uError("SML:0x%"PRIx64" create info failed", info->id); + goto cleanup; + } + return info; cleanup: smlDestroyInfo(info); return NULL; } - -int sml_insert_lines(TAOS* taos, SRequestObj* request, char* lines[], int numLines, SMLProtocolType protocol, int32_t tsType) { +static int32_t smlInsertData(SSmlHandle* info) { int32_t code = TSDB_CODE_SUCCESS; - SSmlLinesInfo* info = smlBuildInfo(taos, request, protocol, tsType); - if(!info){ - code = TSDB_CODE_OUT_OF_MEMORY; - goto cleanup; + SSmlTableInfo** oneTable = taosHashIterate(info->childTables, NULL); + while (oneTable) { + SSmlTableInfo* tableData = *oneTable; + + SName pName = {TSDB_TABLE_NAME_T, info->taos->acctId, {0}, {0}}; + strcpy(pName.dbname, info->pRequest->pDb); + memcpy(pName.tname, tableData->childTableName, strlen(tableData->childTableName)); + SEpSet ep = getEpSet_s(&info->taos->pAppInfo->mgmtEp); + SVgroupInfo vg; + code = catalogGetTableHashVgroup(info->pCatalog, info->taos->pAppInfo->pTransporter, &ep, &pName, &vg); + if (code != 0) { + uError("SML:0x%"PRIx64" catalogGetTableHashVgroup failed. table name: %s", info->id, tableData->childTableName); + return code; + } + taosHashPut(info->pVgHash, (const char*)&vg.vgId, sizeof(vg.vgId), (char*)&vg, sizeof(vg)); + + SSmlSTableMeta** pMeta = taosHashGet(info->superTables, tableData->sTableName, tableData->sTableNameLen); + ASSERT (NULL != pMeta && NULL != *pMeta); + + (*pMeta)->tableMeta->vgId = vg.vgId; + (*pMeta)->tableMeta->uid = tableData->uid; // one table merge data block together according uid + + code = smlBindData(info->exec, tableData->tags, tableData->colsFormat, tableData->columnsHash, + tableData->cols, info->dataFormat, (*pMeta)->tableMeta, info->msgBuf.buf, info->msgBuf.len); + if(code != TSDB_CODE_SUCCESS){ + return code; + } + oneTable = taosHashIterate(info->childTables, oneTable); } + + smlBuildOutput(info->exec, info->pVgHash); + launchQueryImpl(info->pRequest, info->pQuery, TSDB_CODE_SUCCESS, true); + + info->affectedRows = taos_affected_rows(info->pRequest); + return info->pRequest->code; +} + +static int smlInsertLines(SSmlHandle *info, char* lines[], int numLines) { + int32_t code = TSDB_CODE_SUCCESS; + if (numLines <= 0 || numLines > 65536) { - uError("SML:0x%"PRIx64" taos_insert_lines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); + uError("SML:0x%"PRIx64" smlInsertLines numLines should be between 1 and 65536. numLines: %d", info->id, numLines); code = TSDB_CODE_TSC_APP_ERROR; goto cleanup; } + for (int32_t i = 0; i < numLines; ++i) { - code = smlParseLine(lines[i], info); + code = smlParseLine(info, lines[i]); if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%"PRIx64" data point line parse failed. line %d : %s", info->id, i, lines[i]); + uError("SML:0x%"PRIx64" smlParseLine failed. line %d : %s", info->id, i, lines[i]); goto cleanup; } } - uDebug("SML:0x%"PRIx64" data point line parse success. tables %d", info->id, taosHashGetSize(info->childTables)); + uDebug("SML:0x%"PRIx64" smlInsertLines parse success. tables %d", info->id, taosHashGetSize(info->childTables)); + uDebug("SML:0x%"PRIx64" smlInsertLines parse success. super tables %d", info->id, taosHashGetSize(info->superTables)); - code = smlInsert(taos, info); - if (code != TSDB_CODE_SUCCESS) { - uError("SML:0x%"PRIx64" taos_sml_insert error: %s", info->id, tstrerror((code))); + code = smlModifyDBSchemas(info); + if (code != 0) { + uError("SML:0x%"PRIx64" smlModifyDBSchemas error : %s", info->id, tstrerror(code)); goto cleanup; } - uDebug("SML:0x%"PRIx64" taos_insert_lines finish inserting %d lines. code: %d", info->id, numLines, code); + code = smlInsertData(info); + if (code != 0) { + uError("SML:0x%"PRIx64" smlInsertData error : %s", info->id, tstrerror(code)); + goto cleanup; + } + + uDebug("SML:0x%"PRIx64" smlInsertLines finish inserting %d lines.", info->id, numLines); cleanup: smlDestroyInfo(info); return code; } -static int32_t convertPrecisionType(int precision) { - switch (precision) { - case TSDB_SML_TIMESTAMP_HOURS: - return TSDB_TIME_PRECISION_HOURS; - case TSDB_SML_TIMESTAMP_MILLI_SECONDS: - return TSDB_TIME_PRECISION_MILLI; - case TSDB_SML_TIMESTAMP_NANO_SECONDS: - case TSDB_SML_TIMESTAMP_NOT_CONFIGURED: - return TSDB_TIME_PRECISION_NANO; - case TSDB_SML_TIMESTAMP_MICRO_SECONDS: - return TSDB_TIME_PRECISION_MICRO; - case TSDB_SML_TIMESTAMP_SECONDS: - return TSDB_TIME_PRECISION_SECONDS; - case TSDB_SML_TIMESTAMP_MINUTES: - return TSDB_TIME_PRECISION_MINUTES; - default: - return -1; - } -} - /** * taos_schemaless_insert() parse and insert data points into database according to * different protocol. @@ -1473,19 +1678,20 @@ static int32_t convertPrecisionType(int precision) { * */ -TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision) { - int code = TSDB_CODE_SUCCESS; - +TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int protocol, int precision, bool dataFormat) { SRequestObj* request = createRequest(taos, NULL, NULL, TSDB_SQL_INSERT); + if(!request){ + goto end; + } + + SSmlHandle* info = smlBuildSmlInfo(taos, request, protocol, precision, dataFormat); + if(!info){ + goto end; + } + switch (protocol) { case TSDB_SML_LINE_PROTOCOL:{ - int32_t tsType = convertPrecisionType(precision); - if(tsType == -1){ - request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE; - goto end; - } - - code = sml_insert_lines(taos, request, lines, numLines, protocol, tsType); + smlInsertLines(info, lines, numLines); break; } case TSDB_SML_TELNET_PROTOCOL: @@ -1495,7 +1701,6 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr //code = taos_insert_json_payload(taos, *lines, protocol, tsType, &affected_rows); break; default: - code = TSDB_CODE_SML_INVALID_PROTOCOL_TYPE; break; } diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 54e05f9264..d59473b26b 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -1514,6 +1514,7 @@ int32_t qBuildStmtColFields(void *pBlock, int32_t *fieldNum, TAOS_FIELD** fields return TSDB_CODE_SUCCESS; } +// schemaless logic start typedef struct SmlExecHandle { SHashObj* pBlockHash; @@ -1523,7 +1524,7 @@ typedef struct SmlExecHandle { SVCreateTbReq createTblReq; // each table SQuery* pQuery; -} SmlExecHandle; +} SSmlExecHandle; static int32_t smlBoundColumns(SArray *cols, SParsedDataColInfo* pColList, SSchema* pSchema) { col_id_t nCols = pColList->numOfCols; @@ -1620,14 +1621,15 @@ static int32_t smlParseTags(SArray *cols, SKVRowBuilder *tagsBuilder, SParsedDat return TSDB_CODE_SUCCESS; } -int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen) { +int32_t smlBindData(void *handle, SArray *tags, SArray *colsFormat, SHashObj *colsHash, SArray *cols, bool format, + STableMeta *pTableMeta, char *msgBuf, int16_t msgBufLen) { SMsgBuf pBuf = {.buf = msgBuf, .len = msgBufLen}; int32_t rowNum = taosArrayGetSize(cols); if(rowNum <= 0) { return buildInvalidOperationMsg(&pBuf, "cols size <= 0"); } - SmlExecHandle *smlHandle = (SmlExecHandle *)handle; + SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle; SSchema* pTagsSchema = getTableTagSchema(pTableMeta); setBoundColumnInfo(&smlHandle->tags, pTagsSchema, getNumOfTags(pTableMeta)); int ret = smlBoundColumns(tags, &smlHandle->tags, pTagsSchema); @@ -1651,7 +1653,21 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta SSchema* pSchema = getTableColumnSchema(pTableMeta); - ret = smlBoundColumns(taosArrayGetP(cols, 0), &pDataBlock->boundColumnInfo, pSchema); + + if(format){ + ret = smlBoundColumns(taosArrayGetP(colsFormat, 0), &pDataBlock->boundColumnInfo, pSchema); + }else{ + SArray *columns = taosArrayInit(16, POINTER_BYTES); + void **p1 = taosHashIterate(colsHash, NULL); + while (p1) { + SSmlKv* kv = *p1; + taosArrayPush(columns, &kv); + p1 = taosHashIterate(colsHash, p1); + } + ret = smlBoundColumns(columns, &pDataBlock->boundColumnInfo, pSchema); + taosArrayDestroy(columns); + } + if(ret != TSDB_CODE_SUCCESS){ buildInvalidOperationMsg(&pBuf, "bound cols error"); return ret; @@ -1671,7 +1687,12 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta for (int32_t r = 0; r < rowNum; ++r) { STSRow* row = (STSRow*)(pDataBlock->pData + pDataBlock->size); // skip the SSubmitBlk header tdSRowResetBuf(pBuilder, row); - SArray *rowData = taosArrayGetP(cols, r); + void *rowData = NULL; + if(format){ + rowData = taosArrayGetP(colsFormat, r); + }else{ + rowData = taosArrayGetP(cols, r); + } // 1. set the parsed value from sql string for (int c = 0; c < spd->numOfBound; ++c) { @@ -1680,7 +1701,18 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta param.schema = pColSchema; getSTSRowAppendInfo(pBuilder->rowType, spd, c, ¶m.toffset, ¶m.colIdx); - SSmlKv *kv = taosArrayGetP(rowData, c); + SSmlKv *kv = NULL; + if(format){ + kv = taosArrayGetP(rowData, c); + if (!kv){ + char msg[64] = {0}; + sprintf(msg, "cols num not the same like before:%d", r); + return buildInvalidOperationMsg(&pBuf, msg); + } + }else{ + void **p =taosHashGet(rowData, pColSchema->name, strlen(pColSchema->name)); + kv = *p; + } if (kv->valueLen == 0) { MemRowAppend(&pBuf, NULL, 0, ¶m); @@ -1720,23 +1752,25 @@ int32_t smlBind(void *handle, SArray *tags, SArray *cols, STableMeta *pTableMeta return TSDB_CODE_SUCCESS; } -void* tscSmlInitHandle(SQuery *pQuery){ - SmlExecHandle *handle = taosMemoryCalloc(sizeof(SmlExecHandle)); +void* smlInitHandle(SQuery *pQuery){ + SSmlExecHandle *handle = taosMemoryCalloc(1, sizeof(SSmlExecHandle)); + if(!handle) return NULL; handle->pBlockHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, false); handle->pQuery = pQuery; return handle; } -void tscSmlDestroyHandle(void *pHandle){ +void smlDestroyHandle(void *pHandle){ if(!pHandle) return; - SmlExecHandle *handle = (SmlExecHandle *)pHandle; - taosHashCleanup(handle->pBlockHash); + SSmlExecHandle *handle = (SSmlExecHandle *)pHandle; + destroyBlockHashmap(handle->pBlockHash); taosMemoryFree(handle); } int32_t smlBuildOutput(void* handle, SHashObj* pVgHash) { - SmlExecHandle *smlHandle = (SmlExecHandle *)handle; + SSmlExecHandle *smlHandle = (SSmlExecHandle *)handle; return qBuildStmtOutput(smlHandle->pQuery, pVgHash, smlHandle->pBlockHash); } +// schemaless logic end