Merge pull request #12557 from taosdata/feature/TD-14761

refactor: add the configuration of child table name
This commit is contained in:
WANG MINGMING 2022-05-17 16:42:52 +08:00 committed by GitHub
commit a9f67d004f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 291 additions and 145 deletions

View File

@ -125,6 +125,10 @@ extern SDiskCfg tsDiskCfg[];
// udf // udf
extern bool tsStartUdfd; extern bool tsStartUdfd;
// schemaless
extern char tsSmlChildTableName[];
extern bool tsSmlDataFormat;
// internal // internal
extern int32_t tsTransPullupInterval; extern int32_t tsTransPullupInterval;
extern int32_t tsMqRebalanceInterval; extern int32_t tsMqRebalanceInterval;

View File

@ -16,6 +16,7 @@
#include "clientInt.h" #include "clientInt.h"
#include "tname.h" #include "tname.h"
#include "cJSON.h" #include "cJSON.h"
#include "tglobal.h"
//================================================================================================= //=================================================================================================
#define SPACE ' ' #define SPACE ' '
@ -54,6 +55,9 @@ for (int i = 1; i < keyLen; ++i) { \
} \ } \
} }
#define IS_INVALID_COL_LEN(len) ((len) <= 0 || (len) >= TSDB_COL_NAME_LEN)
#define IS_INVALID_TABLE_LEN(len) ((len) <= 0 || (len) >= TSDB_TABLE_NAME_LEN)
#define OTD_MAX_FIELDS_NUM 2 #define OTD_MAX_FIELDS_NUM 2
#define OTD_JSON_SUB_FIELDS_NUM 2 #define OTD_JSON_SUB_FIELDS_NUM 2
#define OTD_JSON_FIELDS_NUM 4 #define OTD_JSON_FIELDS_NUM 4
@ -162,7 +166,7 @@ typedef struct {
SMLProtocolType protocol; SMLProtocolType protocol;
int8_t precision; int8_t precision;
bool dataFormat; // true means that the name, number and order of keys in each line are the same(only for influx protocol) bool dataFormat; // true means that the name and order of keys in each line are the same(only for influx protocol)
SHashObj *childTables; SHashObj *childTables;
SHashObj *superTables; SHashObj *superTables;
@ -594,19 +598,25 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
kvVal->type = TSDB_DATA_TYPE_FLOAT; kvVal->type = TSDB_DATA_TYPE_FLOAT;
kvVal->f = (float)result; kvVal->f = (float)result;
}else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){ }else if ((left == 1 && *endptr == 'i') || (left == 3 && strncasecmp(endptr, "i64", left) == 0)){
if(smlDoubleToInt64OverFlow(result)){ if(result >= (double)INT64_MAX){
smlBuildInvalidDataMsg(msg, "big int is too large, out of precision", pVal); kvVal->i = INT64_MAX;
return false; }else if(result <= (double)INT64_MIN){
kvVal->i = INT64_MIN;
}else{
kvVal->i = result;
} }
kvVal->type = TSDB_DATA_TYPE_BIGINT; kvVal->type = TSDB_DATA_TYPE_BIGINT;
kvVal->i = (int64_t)result;
}else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){ }else if ((left == 3 && strncasecmp(endptr, "u64", left) == 0)){
if(result >= (double)UINT64_MAX || result < 0){ if(result < 0){
smlBuildInvalidDataMsg(msg, "unsigned big int is too large, out of precision", pVal); smlBuildInvalidDataMsg(msg, "unsigned big int is too large, out of precision", pVal);
return false; return false;
} }
kvVal->type = TSDB_DATA_TYPE_UBIGINT; if(result >= (double)UINT64_MAX){
kvVal->u = UINT64_MAX;
}else{
kvVal->u = result; kvVal->u = result;
}
kvVal->type = TSDB_DATA_TYPE_UBIGINT;
}else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){ }else if (left == 3 && strncasecmp(endptr, "i32", left) == 0){
if(!IS_VALID_INT(result)){ if(!IS_VALID_INT(result)){
smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal); smlBuildInvalidDataMsg(msg, "int out of range[-2147483648,2147483647]", pVal);
@ -659,12 +669,12 @@ static bool smlParseNumber(SSmlKv *kvVal, SSmlMsgBuf *msg){
static bool smlParseBool(SSmlKv *kvVal) { static bool smlParseBool(SSmlKv *kvVal) {
const char *pVal = kvVal->value; const char *pVal = kvVal->value;
int32_t len = kvVal->length; int32_t len = kvVal->length;
if ((len == 1) && pVal[0] == 't') { if ((len == 1) && (pVal[0] == 't' || pVal[0] == 'T')) {
kvVal->i = true; kvVal->i = true;
return true; return true;
} }
if ((len == 1) && pVal[0] == 'f') { if ((len == 1) && (pVal[0] == 'f' || pVal[0] == 'F')) {
kvVal->i = false; kvVal->i = false;
return true; return true;
} }
@ -899,8 +909,8 @@ static int32_t smlParseInfluxString(const char* sql, SSmlLineInfo *elements, SSm
sql++; sql++;
} }
elements->measureLen = sql - elements->measure; elements->measureLen = sql - elements->measure;
if(elements->measureLen == 0) { if(IS_INVALID_TABLE_LEN(elements->measureLen)) {
smlBuildInvalidDataMsg(msg, "measure is empty", NULL); smlBuildInvalidDataMsg(msg, "measure is empty or too large than 192", NULL);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
@ -969,8 +979,9 @@ static void smlParseTelnetElement(const char **sql, const char **data, int32_t *
} }
} }
static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ static int32_t smlParseTelnetTags(const char* data, SArray *cols, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
const char *sql = data; const char *sql = data;
size_t childTableNameLen = strlen(tsSmlChildTableName);
while(*sql != '\0'){ while(*sql != '\0'){
JUMP_SPACE(sql) JUMP_SPACE(sql)
if(*sql == '\0') break; if(*sql == '\0') break;
@ -992,7 +1003,7 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump
sql++; sql++;
} }
if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ if(IS_INVALID_COL_LEN(keyLen)){
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
@ -1022,6 +1033,13 @@ static int32_t smlParseTelnetTags(const char* data, SArray *cols, SHashObj *dump
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
//handle child table name
if(childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
continue;
}
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
@ -1043,7 +1061,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
// parse metric // parse metric
smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen); smlParseTelnetElement(&sql, &tinfo->sTableName, &tinfo->sTableNameLen);
if (!(tinfo->sTableName) || tinfo->sTableNameLen == 0) { if (!(tinfo->sTableName) || IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
@ -1085,7 +1103,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
} }
// parse tags // parse tags
ret = smlParseTelnetTags(sql, tinfo->tags, info->dumplicateKey, &info->msgBuf); ret = smlParseTelnetTags(sql, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql); smlBuildInvalidDataMsg(&info->msgBuf, "invalid data", sql);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
@ -1094,7 +1112,7 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
if(isTag && len == 0){ if(isTag && len == 0){
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
@ -1107,6 +1125,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
size_t childTableNameLen = strlen(tsSmlChildTableName);
const char *sql = data; const char *sql = data;
while(sql < data + len){ while(sql < data + len){
const char *key = sql; const char *key = sql;
@ -1126,7 +1145,7 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
sql++; sql++;
} }
if(keyLen == 0 || keyLen >= TSDB_COL_NAME_LEN){ if(IS_INVALID_COL_LEN(keyLen)){
smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key); smlBuildInvalidDataMsg(msg, "invalid key or key is too long than 64", key);
return TSDB_CODE_SML_INVALID_DATA; return TSDB_CODE_SML_INVALID_DATA;
} }
@ -1169,6 +1188,13 @@ static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, bool is
PROCESS_SLASH(key, keyLen) PROCESS_SLASH(key, keyLen)
PROCESS_SLASH(value, valueLen) PROCESS_SLASH(value, valueLen)
//handle child table name
if(childTableName && childTableNameLen != 0 && strncmp(key, tsSmlChildTableName, keyLen) == 0){
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, value, (valueLen < TSDB_TABLE_NAME_LEN ? valueLen : TSDB_TABLE_NAME_LEN));
continue;
}
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
@ -1396,7 +1422,7 @@ static void smlDestroyInfo(SSmlHandle* info){
taosMemoryFreeClear(info); taosMemoryFreeClear(info);
} }
static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision, bool dataFormat){ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocolType protocol, int8_t precision){
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle)); SSmlHandle* info = (SSmlHandle*)taosMemoryCalloc(1, sizeof(SSmlHandle));
if (NULL == info) { if (NULL == info) {
@ -1428,7 +1454,11 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
info->precision = precision; info->precision = precision;
info->protocol = protocol; info->protocol = protocol;
info->dataFormat = dataFormat; if(protocol == TSDB_SML_LINE_PROTOCOL){
info->dataFormat = tsSmlDataFormat;
}else{
info->dataFormat = true;
}
info->pRequest = request; info->pRequest = request;
info->msgBuf.buf = info->pRequest->msgBuf; info->msgBuf.buf = info->pRequest->msgBuf;
info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE; info->msgBuf.len = ERROR_MSG_BUF_DEFAULT_SIZE;
@ -1439,7 +1469,7 @@ static SSmlHandle* smlBuildSmlInfo(TAOS* taos, SRequestObj* request, SMLProtocol
info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); info->pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); info->dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK);
if(!dataFormat){ if(!info->dataFormat){
info->colsContainer = taosArrayInit(32, POINTER_BYTES); info->colsContainer = taosArrayInit(32, POINTER_BYTES);
if(NULL == info->colsContainer){ if(NULL == info->colsContainer){
uError("SML:0x%"PRIx64" create info failed", info->id); uError("SML:0x%"PRIx64" create info failed", info->id);
@ -1477,8 +1507,8 @@ static int32_t smlParseMetricFromJSON(SSmlHandle *info, cJSON *root, SSmlTableIn
} }
tinfo->sTableNameLen = strlen(metric->valuestring); tinfo->sTableNameLen = strlen(metric->valuestring);
if (tinfo->sTableNameLen >= TSDB_TABLE_NAME_LEN) { if (IS_INVALID_TABLE_LEN(tinfo->sTableNameLen)) {
uError("OTD:0x%"PRIx64" Metric cannot exceeds %d characters in JSON", info->id, TSDB_TABLE_NAME_LEN - 1); uError("OTD:0x%"PRIx64" Metric lenght is 0 or large than 192", info->id);
return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH; return TSDB_CODE_TSC_INVALID_TABLE_ID_LENGTH;
} }
@ -1674,11 +1704,13 @@ static int32_t smlConvertJSONNumber(SSmlKv *pVal, char* typeStr, cJSON *value) {
strcasecmp(typeStr, "bigint") == 0) { strcasecmp(typeStr, "bigint") == 0) {
pVal->type = TSDB_DATA_TYPE_BIGINT; pVal->type = TSDB_DATA_TYPE_BIGINT;
pVal->length = (int16_t)tDataTypes[pVal->type].bytes; pVal->length = (int16_t)tDataTypes[pVal->type].bytes;
if(smlDoubleToInt64OverFlow(value->valuedouble)){ if(value->valuedouble >= (double)INT64_MAX){
uError("OTD:JSON value(%f) cannot fit in type(big int)", value->valuedouble); pVal->i = INT64_MAX;
return TSDB_CODE_TSC_VALUE_OUT_OF_RANGE; }else if(value->valuedouble <= (double)INT64_MIN){
} pVal->i = INT64_MIN;
}else{
pVal->i = value->valuedouble; pVal->i = value->valuedouble;
}
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
//float //float
@ -1828,60 +1860,49 @@ static int32_t smlParseColsFromJSON(cJSON *root, SArray *cols) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, SHashObj *dumplicateKey, SSmlMsgBuf *msg) { static int32_t smlParseTagsFromJSON(cJSON *root, SArray *pKVs, char *childTableName, SHashObj *dumplicateKey, SSmlMsgBuf *msg) {
int32_t ret = TSDB_CODE_SUCCESS; int32_t ret = TSDB_CODE_SUCCESS;
cJSON *tags = cJSON_GetObjectItem(root, "tags"); cJSON *tags = cJSON_GetObjectItem(root, "tags");
if (tags == NULL || tags->type != cJSON_Object) { if (tags == NULL || tags->type != cJSON_Object) {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
//handle child table name todo
// size_t childTableNameLen = strlen(tsSmlChildTableName);
// char childTbName[TSDB_TABLE_NAME_LEN] = {0};
// if (childTableNameLen != 0) {
// memcpy(childTbName, tsSmlChildTableName, childTableNameLen);
// cJSON *id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// if (!cJSON_IsString(id)) {
// tscError("OTD:0x%"PRIx64" ID must be JSON string", info->id);
// return TSDB_CODE_TSC_INVALID_JSON;
// }
// size_t idLen = strlen(id->valuestring);
// *childTableName = tcalloc(idLen + TS_BACKQUOTE_CHAR_SIZE + 1, sizeof(char));
// memcpy(*childTableName, id->valuestring, idLen);
// addEscapeCharToString(*childTableName, (int32_t)idLen);
//
// //check duplicate IDs
// cJSON_DeleteItemFromObject(tags, childTbName);
// id = cJSON_GetObjectItem(tags, childTbName);
// if (id != NULL) {
// return TSDB_CODE_TSC_DUP_TAG_NAMES;
// }
// }
// }
size_t childTableNameLen = strlen(tsSmlChildTableName);
int32_t tagNum = cJSON_GetArraySize(tags); int32_t tagNum = cJSON_GetArraySize(tags);
for (int32_t i = 0; i < tagNum; ++i) { for (int32_t i = 0; i < tagNum; ++i) {
cJSON *tag = cJSON_GetArrayItem(tags, i); cJSON *tag = cJSON_GetArrayItem(tags, i);
if (tag == NULL) { if (tag == NULL) {
return TSDB_CODE_TSC_INVALID_JSON; return TSDB_CODE_TSC_INVALID_JSON;
} }
size_t keyLen = strlen(tag->string);
if (IS_INVALID_COL_LEN(keyLen)) {
uError("OTD:Tag key length is 0 or too large than 64");
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
//check duplicate keys //check duplicate keys
if (smlCheckDuplicateKey(tag->string, strlen(tag->string), dumplicateKey)) { if (smlCheckDuplicateKey(tag->string, keyLen, dumplicateKey)) {
return TSDB_CODE_TSC_DUP_TAG_NAMES; return TSDB_CODE_TSC_DUP_TAG_NAMES;
} }
//handle child table name
if(childTableNameLen != 0 && strcmp(tag->string, tsSmlChildTableName) == 0){
if (!cJSON_IsString(tag)) {
uError("OTD:ID must be JSON string");
return TSDB_CODE_TSC_INVALID_JSON;
}
memset(childTableName, 0, TSDB_TABLE_NAME_LEN);
strncpy(childTableName, tag->valuestring, TSDB_TABLE_NAME_LEN);
continue;
}
// add kv to SSmlKv // add kv to SSmlKv
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1); SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
if(!kv) return TSDB_CODE_OUT_OF_MEMORY; if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
if(pKVs) taosArrayPush(pKVs, &kv); if(pKVs) taosArrayPush(pKVs, &kv);
//key //key
kv->keyLen = strlen(tag->string); kv->keyLen = keyLen;
if (kv->keyLen >= TSDB_COL_NAME_LEN) {
uError("OTD:Tag key cannot exceeds %d characters in JSON", TSDB_COL_NAME_LEN - 1);
return TSDB_CODE_TSC_INVALID_COLUMN_LENGTH;
}
ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen); ret = smlJsonCreateSring(&kv->key, tag->string, kv->keyLen);
if (ret != TSDB_CODE_SUCCESS) { if (ret != TSDB_CODE_SUCCESS) {
return ret; return ret;
@ -1937,7 +1958,7 @@ static int32_t smlParseJSONString(SSmlHandle *info, cJSON *root, SSmlTableInfo *
uDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id); uDebug("OTD:0x%"PRIx64" Parse metric value from JSON payload finished", info->id);
//Parse tags //Parse tags
ret = smlParseTagsFromJSON(root, tinfo->tags, info->dumplicateKey, &info->msgBuf); ret = smlParseTagsFromJSON(root, tinfo->tags, tinfo->childTableName, info->dumplicateKey, &info->msgBuf);
if (ret) { if (ret) {
uError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id); uError("OTD:0x%"PRIx64" Unable to parse tags from JSON payload", info->id);
return ret; return ret;
@ -1975,7 +1996,7 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) {
if(info->dataFormat) taosArrayDestroy(cols); if(info->dataFormat) taosArrayDestroy(cols);
return ret; return ret;
} }
ret = smlParseCols(elements.cols, elements.colsLen, cols, false, info->dumplicateKey, &info->msgBuf); ret = smlParseCols(elements.cols, elements.colsLen, cols, NULL, false, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse cloums fields failed", info->id);
smlDestroyCols(cols); smlDestroyCols(cols);
@ -2006,7 +2027,7 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) {
} }
if(!hasTable){ if(!hasTable){
ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, true, info->dumplicateKey, &info->msgBuf); ret = smlParseCols(elements.tags, elements.tagsLen, (*oneTable)->tags, (*oneTable)->childTableName, true, info->dumplicateKey, &info->msgBuf);
if(ret != TSDB_CODE_SUCCESS){ if(ret != TSDB_CODE_SUCCESS){
uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id); uError("SML:0x%"PRIx64" smlParseCols parse tag fields failed", info->id);
return ret; return ret;
@ -2019,11 +2040,16 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) {
(*oneTable)->sTableName = elements.measure; (*oneTable)->sTableName = elements.measure;
(*oneTable)->sTableNameLen = elements.measureLen; (*oneTable)->sTableNameLen = elements.measureLen;
if(strlen((*oneTable)->childTableName) == 0){
RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen, RandTableName rName = { (*oneTable)->tags, (*oneTable)->sTableName, (uint8_t)(*oneTable)->sTableNameLen,
(*oneTable)->childTableName, 0 }; (*oneTable)->childTableName, 0 };
buildChildTableName(&rName); buildChildTableName(&rName);
(*oneTable)->uid = rName.uid; (*oneTable)->uid = rName.uid;
}else{
(*oneTable)->uid = *(uint64_t*)((*oneTable)->childTableName);
}
} }
SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen); SSmlSTableMeta** tableMeta = (SSmlSTableMeta**)taosHashGet(info->superTables, elements.measure, elements.measureLen);
@ -2087,10 +2113,15 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) {
} }
taosHashClear(info->dumplicateKey); taosHashClear(info->dumplicateKey);
if(strlen(tinfo->childTableName) == 0){
RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen, RandTableName rName = { tinfo->tags, tinfo->sTableName, (uint8_t)tinfo->sTableNameLen,
tinfo->childTableName, 0 }; tinfo->childTableName, 0 };
buildChildTableName(&rName); buildChildTableName(&rName);
tinfo->uid = rName.uid; tinfo->uid = rName.uid;
}else{
tinfo->uid = *(uint64_t*)(tinfo->childTableName); // generate uid by name simple
}
bool hasTable = true; bool hasTable = true;
SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName));
@ -2308,14 +2339,14 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
return NULL; return NULL;
} }
SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision, true); SSmlHandle* info = smlBuildSmlInfo(taos, request, (SMLProtocolType)protocol, precision);
if(!info){ if(!info){
return (TAOS_RES*)request; return (TAOS_RES*)request;
} }
if (numLines <= 0 || numLines > 65536) { if (!lines) {
request->code = TSDB_CODE_SML_INVALID_DATA; request->code = TSDB_CODE_SML_INVALID_DATA;
smlBuildInvalidDataMsg(&info->msgBuf, "numLines should be between 1 and 65536", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "lines is null", NULL);
goto end; goto end;
} }
@ -2325,7 +2356,7 @@ TAOS_RES* taos_schemaless_insert(TAOS* taos, char* lines[], int numLines, int pr
goto end; goto end;
} }
if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_HOURS || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){ if(protocol == TSDB_SML_LINE_PROTOCOL && (precision < TSDB_SML_TIMESTAMP_NOT_CONFIGURED || precision > TSDB_SML_TIMESTAMP_NANO_SECONDS)){
request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE; request->code = TSDB_CODE_SML_INVALID_PRECISION_TYPE;
smlBuildInvalidDataMsg(&info->msgBuf, "precision invalidate for line protocol", NULL); smlBuildInvalidDataMsg(&info->msgBuf, "precision invalidate for line protocol", NULL);
goto end; goto end;

View File

@ -41,3 +41,7 @@ TARGET_INCLUDE_DIRECTORIES(
PRIVATE "${TD_SOURCE_DIR}/source/client/inc" PRIVATE "${TD_SOURCE_DIR}/source/client/inc"
) )
#add_test(
# NAME smlTest
# COMMAND smlTest
#)

View File

@ -207,7 +207,7 @@ TEST(testCase, smlParseCols_Error_Test) {
char *sql = (char*)taosMemoryCalloc(256, 1); char *sql = (char*)taosMemoryCalloc(256, 1);
memcpy(sql, data[i], len + 1); memcpy(sql, data[i], len + 1);
SArray *cols = taosArrayInit(8, POINTER_BYTES); SArray *cols = taosArrayInit(8, POINTER_BYTES);
int32_t ret = smlParseCols(sql, len, cols, false, dumplicateKey, &msgBuf); int32_t ret = smlParseCols(sql, len, cols, NULL, false, dumplicateKey, &msgBuf);
ASSERT_NE(ret, TSDB_CODE_SUCCESS); ASSERT_NE(ret, TSDB_CODE_SUCCESS);
taosHashClear(dumplicateKey); taosHashClear(dumplicateKey);
taosMemoryFree(sql); taosMemoryFree(sql);
@ -233,7 +233,7 @@ TEST(testCase, smlParseCols_tag_Test) {
const char *data = const char *data =
"cbin=\"passit helloc\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; "cbin=\"passit helloc\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\"";
int32_t len = strlen(data); int32_t len = strlen(data);
int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); int32_t ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS); ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
int32_t size = taosArrayGetSize(cols); int32_t size = taosArrayGetSize(cols);
ASSERT_EQ(size, 19); ASSERT_EQ(size, 19);
@ -265,7 +265,7 @@ TEST(testCase, smlParseCols_tag_Test) {
len = 0; len = 0;
memset(msgBuf.buf, 0, msgBuf.len); memset(msgBuf.buf, 0, msgBuf.len);
taosHashClear(dumplicateKey); taosHashClear(dumplicateKey);
ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS); ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
size = taosArrayGetSize(cols); size = taosArrayGetSize(cols);
ASSERT_EQ(size, 1); ASSERT_EQ(size, 1);
@ -298,7 +298,7 @@ TEST(testCase, smlParseCols_Test) {
int32_t len = strlen(data); int32_t len = strlen(data);
char *sql = (char*)taosMemoryCalloc(1024, 1); char *sql = (char*)taosMemoryCalloc(1024, 1);
memcpy(sql, data, len + 1); memcpy(sql, data, len + 1);
int32_t ret = smlParseCols(sql, len, cols, false, dumplicateKey, &msgBuf); int32_t ret = smlParseCols(sql, len, cols, NULL, false, dumplicateKey, &msgBuf);
ASSERT_EQ(ret, TSDB_CODE_SUCCESS); ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
int32_t size = taosArrayGetSize(cols); int32_t size = taosArrayGetSize(cols);
ASSERT_EQ(size, 19); ASSERT_EQ(size, 19);
@ -488,44 +488,118 @@ TEST(testCase, smlProcess_influx_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr); ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); TAOS_RES* pRes = taos_query(taos, "create database if not exists inflx_db");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db"); pRes = taos_query(taos, "use inflx_db");
taos_free_result(pRes); taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[] = { const char *sql[] = {
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0 1451606401000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451607402000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 load_capacity=1500,fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,heading=221,grade=0,fuel_consumption=25 1451608403000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_capacity=150,nominal_fuel_consumption=12,latitude=52.31854,longitude=4.72037,elevation=124,velocity=0,heading=221,grade=0,fuel_consumption=25 1451609404000000000",
"readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619400000000000", "readings,name=truck_0,fleet=South,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451619405000000000",
"readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_1,fleet=South,driver=Albert,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=72.45258,longitude=68.83761,elevation=255,velocity=0,heading=181,grade=0,fuel_consumption=25 145160640600000000",
"readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606400000000000", "readings,name=truck_2,driver=Derek,model=F-150,device_version=v1.5 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451606407000000000",
"readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609400000000000", "readings,name=truck_2,fleet=North,driver=Derek,model=F-150 load_capacity=2000,fuel_capacity=200,nominal_fuel_consumption=15,latitude=24.5208,longitude=28.09377,elevation=428,velocity=0,heading=304,grade=0,fuel_consumption=25 1451609408000000000",
"readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629400000000000", "readings,fleet=South,name=truck_0,driver=Trish,model=H-2,device_version=v2.3 fuel_consumption=25,grade=0 1451629409000000000",
"stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000", "stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=\"kk\",c4=4 1451629501000000000",
"stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000", "stable,t2=t2,t1=t1,t3=t3 c1=1,c3=\"\",c4=4 1451629602000000000",
}; };
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
// TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); // case 1
// ASSERT_NE(res, nullptr); TAOS_RES *res = taos_query(taos, "select * from t_91e0b182be80332b5c530cbf872f760e");
// int fieldNum = taos_field_count(res); ASSERT_NE(res, nullptr);
// ASSERT_EQ(fieldNum, 5); int fieldNum = taos_field_count(res);
// int rowNum = taos_affected_rows(res); ASSERT_EQ(fieldNum, 11);
// ASSERT_EQ(rowNum, 2); printf("fieldNum:%d\n", fieldNum);
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res); TAOS_ROW row = NULL;
// } int32_t rowIndex = 0;
// taos_free_result(res); while((row = taos_fetch_row(res)) != NULL) {
int64_t ts = *(int64_t*)row[0];
double load_capacity = *(double*)row[1];
double fuel_capacity = *(double*)row[2];
double nominal_fuel_consumption = *(double*)row[3];
double latitude = *(double*)row[4];
double longitude = *(double*)row[5];
double elevation = *(double*)row[6];
double velocity = *(double*)row[7];
double heading = *(double*)row[8];
double grade = *(double*)row[9];
double fuel_consumption = *(double*)row[10];
if(rowIndex == 0){
ASSERT_EQ(ts, 1451606407000);
ASSERT_EQ(load_capacity, 2000);
ASSERT_EQ(fuel_capacity, 200);
ASSERT_EQ(nominal_fuel_consumption, 15);
ASSERT_EQ(latitude, 24.5208);
ASSERT_EQ(longitude, 28.09377);
ASSERT_EQ(elevation, 428);
ASSERT_EQ(velocity, 0);
ASSERT_EQ(heading, 304);
ASSERT_EQ(grade, 0);
ASSERT_EQ(fuel_consumption, 25);
}else{
ASSERT_FALSE(1);
}
rowIndex++;
}
taos_free_result(res);
// case 2
res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d");
ASSERT_NE(res, nullptr);
fieldNum = taos_field_count(res);
ASSERT_EQ(fieldNum, 5);
printf("fieldNum:%d\n", fieldNum);
rowIndex = 0;
while((row = taos_fetch_row(res)) != NULL) {
int *length = taos_fetch_lengths(res);
int64_t ts = *(int64_t*)row[0];
double c1 = *(double*)row[1];
double c4 = *(double*)row[4];
if(rowIndex == 0){
ASSERT_EQ(ts, 1451629501000);
ASSERT_EQ(c1, 1);
ASSERT_EQ(*(double*)row[2], 2);
ASSERT_EQ(length[3], 2);
ASSERT_EQ(memcmp(row[3], "kk", length[3]), 0);
ASSERT_EQ(c4, 4);
}else if(rowIndex == 1){
ASSERT_EQ(ts, 1451629602000);
ASSERT_EQ(c1, 1);
ASSERT_EQ(row[2], nullptr);
ASSERT_EQ(length[3], 0);
ASSERT_EQ(c4, 4);
}else{
ASSERT_FALSE(1);
}
rowIndex++;
}
taos_free_result(res);
// case 2
res = taos_query(taos, "show tables");
ASSERT_NE(res, nullptr);
row = taos_fetch_row(res);
int rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 5);
taos_free_result(res);
destroyRequest(request); destroyRequest(request);
smlDestroyInfo(info); smlDestroyInfo(info);
} }
@ -544,7 +618,7 @@ TEST(testCase, smlParseLine_error_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[] = { const char *sql[] = {
@ -584,16 +658,16 @@ TEST(testCase, smlProcess_telnet_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr); ASSERT_NE(taos, nullptr);
TAOS_RES* pRes = taos_query(taos, "create database if not exists sml_db"); TAOS_RES* pRes = taos_query(taos, "create database if not exists telnet_db");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db"); pRes = taos_query(taos, "use telnet_db");
taos_free_result(pRes); taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[] = { const char *sql[] = {
@ -605,27 +679,31 @@ TEST(testCase, smlProcess_telnet_Test) {
int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0]));
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
// TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a"); // case 1
// ASSERT_NE(res, nullptr); TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a");
// int fieldNum = taos_field_count(res); ASSERT_NE(res, nullptr);
// ASSERT_EQ(fieldNum, 2); int fieldNum = taos_field_count(res);
// int rowNum = taos_affected_rows(res); ASSERT_EQ(fieldNum, 2);
// ASSERT_EQ(rowNum, 1);
// for (int i = 0; i < rowNum; ++i) { TAOS_ROW row = taos_fetch_row(res);
// TAOS_ROW rows = taos_fetch_row(res); int64_t ts = *(int64_t*)row[0];
// } double c1 = *(double*)row[1];
// taos_free_result(res); ASSERT_EQ(ts, 1479496100000);
ASSERT_EQ(c1, 42);
int rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 1);
taos_free_result(res);
// case 2
res = taos_query(taos, "show tables");
ASSERT_NE(res, nullptr);
row = taos_fetch_row(res);
rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 3);
taos_free_result(res);
// res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961");
// ASSERT_NE(res, nullptr);
// fieldNum = taos_field_count(res);
// ASSERT_EQ(fieldNum, 2);
// rowNum = taos_affected_rows(res);
// ASSERT_EQ(rowNum, 2);
// for (int i = 0; i < rowNum; ++i) {
// TAOS_ROW rows = taos_fetch_row(res);
// }
// taos_free_result(res);
destroyRequest(request); destroyRequest(request);
smlDestroyInfo(info); smlDestroyInfo(info);
} }
@ -634,16 +712,16 @@ TEST(testCase, smlProcess_json1_Test) {
TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0); TAOS *taos = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(taos, nullptr); ASSERT_NE(taos, nullptr);
TAOS_RES *pRes = taos_query(taos, "create database if not exists sml_db"); TAOS_RES *pRes = taos_query(taos, "create database if not exists json_db");
taos_free_result(pRes); taos_free_result(pRes);
pRes = taos_query(taos, "use sml_db"); pRes = taos_query(taos, "use json_db");
taos_free_result(pRes); taos_free_result(pRes);
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql = const char *sql =
@ -670,16 +748,31 @@ TEST(testCase, smlProcess_json1_Test) {
int ret = smlProcess(info, (char **)(&sql), -1); int ret = smlProcess(info, (char **)(&sql), -1);
ASSERT_EQ(ret, 0); ASSERT_EQ(ret, 0);
// TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7"); // case 1
// ASSERT_NE(res, nullptr); TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7");
// int fieldNum = taos_field_count(res); ASSERT_NE(res, nullptr);
// ASSERT_EQ(fieldNum, 2); int fieldNum = taos_field_count(res);
// int rowNum = taos_affected_rows(res); ASSERT_EQ(fieldNum, 2);
// ASSERT_EQ(rowNum, 1);
// for (int i = 0; i < rowNum; ++i) { TAOS_ROW row = taos_fetch_row(res);
// TAOS_ROW rows = taos_fetch_row(res); int64_t ts = *(int64_t*)row[0];
// } double c1 = *(double*)row[1];
// taos_free_result(res); ASSERT_EQ(ts, 1346846400000);
ASSERT_EQ(c1, 18);
int rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 1);
taos_free_result(res);
// case 2
res = taos_query(taos, "show tables");
ASSERT_NE(res, nullptr);
row = taos_fetch_row(res);
rowNum = taos_affected_rows(res);
ASSERT_EQ(rowNum, 2);
taos_free_result(res);
destroyRequest(request); destroyRequest(request);
smlDestroyInfo(info); smlDestroyInfo(info);
} }
@ -697,7 +790,7 @@ TEST(testCase, smlProcess_json2_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql = const char *sql =
"{\n" "{\n"
@ -741,7 +834,7 @@ TEST(testCase, smlProcess_json3_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj *)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql = const char *sql =
"{\n" "{\n"
@ -813,7 +906,7 @@ TEST(testCase, smlProcess_json4_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql = "{\n" const char *sql = "{\n"
" \"metric\": \"meter_current2\",\n" " \"metric\": \"meter_current2\",\n"
@ -875,7 +968,7 @@ TEST(testCase, smlParseTelnetLine_error_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
int32_t ret = 0; int32_t ret = 0;
@ -924,7 +1017,7 @@ TEST(testCase, smlParseTelnetLine_diff_type_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[2] = { const char *sql[2] = {
@ -951,7 +1044,7 @@ TEST(testCase, smlParseTelnetLine_json_error_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
int32_t ret = 0; int32_t ret = 0;
@ -1019,7 +1112,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type1_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[2] = { const char *sql[2] = {
@ -1064,7 +1157,7 @@ TEST(testCase, smlParseTelnetLine_diff_json_type2_Test) {
SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT);
ASSERT_NE(request, nullptr); ASSERT_NE(request, nullptr);
SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS);
ASSERT_NE(info, nullptr); ASSERT_NE(info, nullptr);
const char *sql[2] = { const char *sql[2] = {

View File

@ -76,6 +76,11 @@ int32_t tsTelemInterval = 86400;
char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com"; char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
uint16_t tsTelemPort = 80; uint16_t tsTelemPort = 80;
// schemaless
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
//If set to empty system will generate table name using MD5 hash.
bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol)
// query // query
int32_t tsQueryPolicy = 1; int32_t tsQueryPolicy = 1;
@ -319,6 +324,8 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1; if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1; if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
tsNumOfTaskQueueThreads = tsNumOfCores / 4; tsNumOfTaskQueueThreads = tsNumOfCores / 4;
tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2); tsNumOfTaskQueueThreads = TRANGE(tsNumOfTaskQueueThreads, 1, 2);
@ -513,6 +520,9 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
return -1; return -1;
} }
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32; tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32; tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32;
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32; tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;

View File

@ -17,3 +17,7 @@ TARGET_INCLUDE_DIRECTORIES(
PUBLIC "${TD_SOURCE_DIR}/source/libs/parser/inc" PUBLIC "${TD_SOURCE_DIR}/source/libs/parser/inc"
PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc" PRIVATE "${TD_SOURCE_DIR}/source/libs/scalar/inc"
) )
#add_test(
# NAME scalarTest
# COMMAND scalarTest
#)