From 5d89e7fada160bf04fdf51bfaa999e3330f42daa Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 12 May 2022 22:48:27 +0800 Subject: [PATCH] refactor: add telnet and json protocol for schemaless --- source/client/src/clientSml.c | 27 +++++----- source/client/test/smlTest.cpp | 99 +++++++++++++++------------------- 2 files changed, 56 insertions(+), 70 deletions(-) diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index 5d7127dab4..92144dbcd5 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -239,11 +239,11 @@ static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSi memcpy(tname, field->key, field->keyLen); if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) { int32_t bytes = field->valueLen; // todo - int out = snprintf(buf, bufSize,"%s %s(%d)", + int out = snprintf(buf, bufSize,"`%s` %s(%d)", tname,tDataTypes[field->type].name, bytes); *outBytes = out; } else { - int out = snprintf(buf, bufSize, "%s %s", tname, tDataTypes[type].name); + int out = snprintf(buf, bufSize, "`%s` %s", tname, tDataTypes[type].name); *outBytes = out; } @@ -352,7 +352,7 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) { break; } case SCHEMA_ACTION_CREATE_STABLE: { - int n = sprintf(result, "create stable %s (", action->createSTable.sTableName); + int n = sprintf(result, "create stable `%s` (", action->createSTable.sTableName); char* pos = result + n; int freeBytes = capacity - n; SArray *cols = action->createSTable.fields; @@ -1426,7 +1426,7 @@ cleanup: /************* TSDB_SML_JSON_PROTOCOL function start **************/ static int32_t smlJsonCreateSring(const char **output, char *input, int32_t inputLen){ - *output = taosMemoryMalloc(inputLen); + *output = (const char *)taosMemoryMalloc(inputLen); if (*output == NULL){ return TSDB_CODE_TSC_OUT_OF_MEMORY; } @@ -1754,7 +1754,7 @@ static int32_t smlParseValueFromJSON(cJSON *root, SSmlKv *kv) { * user configured parameter tsDefaultJSONStrType */ - char *tsDefaultJSONStrType = "nchar"; //todo + char *tsDefaultJSONStrType = "binary"; //todo smlConvertJSONString(kv, tsDefaultJSONStrType, root); break; } @@ -1954,14 +1954,15 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { } bool hasTable = true; + SSmlTableInfo *tinfo = NULL; SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, elements.measure, elements.measureTagsLen); if(!oneTable){ - SSmlTableInfo *tinfo = smlBuildTableInfo(); + tinfo = smlBuildTableInfo(); if(!tinfo){ return TSDB_CODE_TSC_OUT_OF_MEMORY; } taosHashPut(info->childTables, elements.measure, elements.measureTagsLen, &tinfo, POINTER_BYTES); - *oneTable = tinfo; + oneTable = &tinfo; hasTable = false; } @@ -1984,7 +1985,7 @@ static int32_t smlParseInfluxLine(SSmlHandle* info, const char* sql) { (*oneTable)->sTableName = elements.measure; (*oneTable)->sTableNameLen = elements.measureLen; - RandTableName rName = {.tags=(*oneTable)->tags, .sTableName=(*oneTable)->sTableName, .sTableNameLen=(*oneTable)->sTableNameLen, + RandTableName rName = {.tags=(*oneTable)->tags, .sTableName=(*oneTable)->sTableName, .sTableNameLen=(uint8_t)(*oneTable)->sTableNameLen, .childTableName=(*oneTable)->childTableName}; buildChildTableName(&rName); (*oneTable)->uid = rName.uid; @@ -2031,7 +2032,7 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) { ASSERT(0); } if(ret != TSDB_CODE_SUCCESS){ - uError("SML:0x%"PRIx64" smlParseInflux failed", info->id); + uError("SML:0x%"PRIx64" smlParseTelnetLine failed", info->id); smlDestroyTableInfo(tinfo, true); taosArrayDestroy(cols); return ret; @@ -2043,23 +2044,23 @@ static int32_t smlParseTelnetLine(SSmlHandle* info, void *data) { } taosHashClear(info->dumplicateKey); - RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=tinfo->sTableNameLen, + RandTableName rName = {.tags=tinfo->tags, .sTableName=tinfo->sTableName, .sTableNameLen=(uint8_t)tinfo->sTableNameLen, .childTableName=tinfo->childTableName}; buildChildTableName(&rName); tinfo->uid = rName.uid; bool hasTable = true; - SSmlTableInfo **oneTable = taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); + SSmlTableInfo **oneTable = (SSmlTableInfo **)taosHashGet(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName)); if(!oneTable) { taosHashPut(info->childTables, tinfo->childTableName, strlen(tinfo->childTableName), &tinfo, POINTER_BYTES); - *oneTable = tinfo; + oneTable = &tinfo; hasTable = false; }else{ smlDestroyTableInfo(tinfo, true); } taosArrayPush((*oneTable)->cols, &cols); - SSmlSTableMeta** tableMeta = taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); + SSmlSTableMeta** tableMeta = (SSmlSTableMeta** )taosHashGet(info->superTables, (*oneTable)->sTableName, (*oneTable)->sTableNameLen); if(tableMeta){ // update meta ret = smlUpdateMeta(*tableMeta, hasTable ? NULL : (*oneTable)->tags, cols, &info->msgBuf); if(!ret){ diff --git a/source/client/test/smlTest.cpp b/source/client/test/smlTest.cpp index 7bd7a84a6e..f579489e62 100644 --- a/source/client/test/smlTest.cpp +++ b/source/client/test/smlTest.cpp @@ -140,15 +140,13 @@ TEST(testCase, smlParseCols_Error_Test) { "c=f64", // double "c=8f64f", "c=8ef64", - "c=1.7976931348623158e+390f64", "c=f32", // float "c=8f32f", "c=8wef32", "c=-3.402823466e+39f32", - "c=", // float + "c=", // double "c=8f", "c=8we", - "c=3.402823466e+39", "c=i8", // tiny int "c=-8i8f", "c=8wei8", @@ -218,7 +216,7 @@ TEST(testCase, smlParseCols_tag_Test) { SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); const char *data = - "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; + "cbin=\"passit helloc=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; int32_t len = strlen(data); int32_t ret = smlParseCols(data, len, cols, true, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); @@ -230,7 +228,7 @@ TEST(testCase, smlParseCols_tag_Test) { ASSERT_EQ(strncasecmp(kv->key, "cbin", 4), 0); ASSERT_EQ(kv->keyLen, 4); ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR); - ASSERT_EQ(kv->valueLen, 18); + ASSERT_EQ(kv->valueLen, 17); ASSERT_EQ(strncasecmp(kv->value, "\"passit", 7), 0); taosMemoryFree(kv); @@ -280,7 +278,7 @@ TEST(testCase, smlParseCols_Test) { SHashObj *dumplicateKey = taosHashInit(32, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY), false, HASH_NO_LOCK); - const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf32_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; + const char *data = "cbin=\"passit hello,c=2\",cnch=L\"iisdfsf\",cbool=false,cf64=4.31f64,cf64_=8.32,cf32=8.23f32,ci8=-34i8,cu8=89u8,ci16=233i16,cu16=898u16,ci32=98289i32,cu32=12323u32,ci64=-89238i64,ci=989i,cu64=8989323u64,cbooltrue=true,cboolt=t,cboolf=f,cnch_=l\"iuwq\""; int32_t len = strlen(data); int32_t ret = smlParseCols(data, len, cols, false, dumplicateKey, &msgBuf); ASSERT_EQ(ret, TSDB_CODE_SUCCESS); @@ -321,17 +319,17 @@ TEST(testCase, smlParseCols_Test) { ASSERT_EQ(kv->type, TSDB_DATA_TYPE_DOUBLE); ASSERT_EQ(kv->length, 8); //ASSERT_EQ(kv->d, 4.31); - printf("4.31 = kv->f:%f\n", kv->d); + printf("4.31 = kv->d:%f\n", kv->d); taosMemoryFree(kv); // float kv = (SSmlKv *)taosArrayGetP(cols, 4); - ASSERT_EQ(strncasecmp(kv->key, "cf32_", 5), 0); + ASSERT_EQ(strncasecmp(kv->key, "cf64_", 5), 0); ASSERT_EQ(kv->keyLen, 5); - ASSERT_EQ(kv->type, TSDB_DATA_TYPE_FLOAT); - ASSERT_EQ(kv->length, 4); + ASSERT_EQ(kv->type, TSDB_DATA_TYPE_DOUBLE); + ASSERT_EQ(kv->length, 8); //ASSERT_EQ(kv->f, 8.32); - printf("8.32 = kv->f:%f\n", kv->f); + printf("8.32 = kv->d:%f\n", kv->d); taosMemoryFree(kv); // float @@ -496,22 +494,13 @@ TEST(testCase, smlProcess_influx_Test) { "stable,t1=t1,t2=t2,t3=t3 c1=1,c2=2,c3=3,c4=4 1451629500000000000", "stable,t2=t2,t1=t1,t3=t3 c1=1,c3=3,c4=4 1451629600000000000" }; - smlProcess(info, (char**)sql, 11); + smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); - TAOS_RES *res = taos_query(taos, "select * from"); + TAOS_RES *res = taos_query(taos, "select * from t_6885c584b98481584ee13dac399e173d"); ASSERT_NE(res, nullptr); int fieldNum = taos_field_count(res); ASSERT_EQ(fieldNum, 11); int rowNum = taos_affected_rows(res); - for (int i = 0; i < rowNum; ++i) { - TAOS_ROW rows = taos_fetch_row(res); - } - - res = taos_query(taos, "select * from"); - ASSERT_NE(res, nullptr); - fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 4); - rowNum = taos_affected_rows(res); ASSERT_EQ(rowNum, 2); for (int i = 0; i < rowNum; ++i) { TAOS_ROW rows = taos_fetch_row(res); @@ -539,7 +528,7 @@ TEST(testCase, smlParseLine_error_Test) { "measure,t1=3 c1=8", "measure,t2=3 c1=8u8" }; - int ret = smlProcess(info, (char **)sql, 2); + int ret = smlProcess(info, (char **)sql, sizeof(sql)/sizeof(sql[0])); ASSERT_NE(ret, 0); } @@ -582,28 +571,29 @@ TEST(testCase, smlParseLine_error_Test) { SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_TELNET_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); ASSERT_NE(info, nullptr); - const char *sql[11] = { - "sys.if.bytes.out 1479496100 1.3E3 host=web01 interface=eth0" - "sys.if.bytes.out 1479496200 1.3E3 interface=eth0 host=web01 ", - "sys.if.bytes.out 1479496300 1.3E3 network=tcp" - "sys.procs.running 1479496400 42 host=web01", + const char *sql[4] = { + "sys.if.bytes.out 1479496100 1.3E0 host=web01 interface=eth0", + "sys.if.bytes.out 1479496101 1.3E1 interface=eth0 host=web01 ", + "sys.if.bytes.out 1479496102 1.3E3 network=tcp", + "sys.procs.running 1479496100 42 host=web01" }; - int ret = smlProcess(info, (char**)sql, 11); + int ret = smlProcess(info, (char**)sql, sizeof(sql)/sizeof(sql[0])); ASSERT_EQ(ret, 0); - TAOS_RES *res = taos_query(taos, "select * from"); + TAOS_RES *res = taos_query(taos, "select * from t_8c30283b3c4131a071d1e16cf6d7094a"); ASSERT_NE(res, nullptr); int fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 11); + ASSERT_EQ(fieldNum, 2); int rowNum = taos_affected_rows(res); + ASSERT_EQ(rowNum, 1); for (int i = 0; i < rowNum; ++i) { TAOS_ROW rows = taos_fetch_row(res); } - res = taos_query(taos, "select * from"); + res = taos_query(taos, "select * from t_6931529054e5637ca92c78a1ad441961"); ASSERT_NE(res, nullptr); fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 4); + ASSERT_EQ(fieldNum, 2); rowNum = taos_affected_rows(res); ASSERT_EQ(rowNum, 2); for (int i = 0; i < rowNum; ++i) { @@ -624,7 +614,7 @@ TEST(testCase, smlParseLine_error_Test) { SRequestObj *request = (SRequestObj *)createRequest((STscObj*)taos, NULL, NULL, TSDB_SQL_INSERT); ASSERT_NE(request, nullptr); - SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_LINE_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); + SSmlHandle *info = smlBuildSmlInfo(taos, request, TSDB_SML_JSON_PROTOCOL, TSDB_SML_TIMESTAMP_NANO_SECONDS, true); ASSERT_NE(info, nullptr); const char *sql = "[\n" @@ -647,26 +637,18 @@ TEST(testCase, smlParseLine_error_Test) { " }\n" " }\n" "]"; - smlProcess(info, (char**)sql, 11); + int ret = smlProcess(info, (char**)(&sql), -1); + ASSERT_EQ(ret, 0); - TAOS_RES *res = taos_query(taos, "select * from"); + TAOS_RES *res = taos_query(taos, "select * from t_cb27a7198d637b4f1c6464bd73f756a7"); ASSERT_NE(res, nullptr); int fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 11); - int rowNum = taos_affected_rows(res); - for (int i = 0; i < rowNum; ++i) { - TAOS_ROW rows = taos_fetch_row(res); - } - - res = taos_query(taos, "select * from"); - ASSERT_NE(res, nullptr); - fieldNum = taos_field_count(res); - ASSERT_EQ(fieldNum, 4); - rowNum = taos_affected_rows(res); - ASSERT_EQ(rowNum, 2); - for (int i = 0; i < rowNum; ++i) { - TAOS_ROW rows = taos_fetch_row(res); - } + ASSERT_EQ(fieldNum, 2); +// int rowNum = taos_affected_rows(res); +// ASSERT_EQ(rowNum, 1); +// for (int i = 0; i < rowNum; ++i) { +// TAOS_ROW rows = taos_fetch_row(res); +// } sql = "{\n" " \"metric\": \"meter_current\",\n" @@ -685,12 +667,13 @@ TEST(testCase, smlParseLine_error_Test) { " },\n" " \"location\": { \n" " \"value\" : \"北京\",\n" - " \"type\" : \"nchar\"\n" + " \"type\" : \"binary\"\n" " },\n" " \"id\": \"d1001\"\n" " }\n" "}"; - smlProcess(info, (char**)sql, 11); + ret = smlProcess(info, (char**)(&sql), -1); + ASSERT_EQ(ret, 0); sql = "{\n" " \"metric\": \"meter_current\",\n" @@ -733,7 +716,7 @@ TEST(testCase, smlParseLine_error_Test) { " },\n" " \"t8\": { \n" " \"value\" : \"北京\",\n" - " \"type\" : \"nchar\"\n" + " \"type\" : \"binary\"\n" " },\n" " \"t9\": { \n" " \"value\" : true,\n" @@ -742,7 +725,8 @@ TEST(testCase, smlParseLine_error_Test) { " \"id\": \"d1001\"\n" " }\n" "}"; - smlProcess(info, (char**)sql, 11); + ret = smlProcess(info, (char**)(&sql), -1); + ASSERT_EQ(ret, 0); sql = "{\n" " \"metric\": \"meter_current\",\n" @@ -779,11 +763,12 @@ TEST(testCase, smlParseLine_error_Test) { " \"t7\": \"nsj\",\n" " \"t8\": { \n" " \"value\" : \"北京\",\n" - " \"type\" : \"nchar\"\n" + " \"type\" : \"binary\"\n" " },\n" " \"t9\": false,\n" " \"id\": \"d1001\"\n" " }\n" "}"; - smlProcess(info, (char**)sql, 11); + ret = smlProcess(info, (char**)(&sql), -1); + ASSERT_EQ(ret, 0); }