Merge pull request #12803 from taosdata/feature/TD-14761
fea: add null tag support in schemaless
This commit is contained in:
commit
62e3cb1617
|
@ -128,6 +128,7 @@ extern bool tsStartUdfd;
|
||||||
|
|
||||||
// schemaless
|
// schemaless
|
||||||
extern char tsSmlChildTableName[];
|
extern char tsSmlChildTableName[];
|
||||||
|
extern char tsSmlTagName[];
|
||||||
extern bool tsSmlDataFormat;
|
extern bool tsSmlDataFormat;
|
||||||
|
|
||||||
// internal
|
// internal
|
||||||
|
|
|
@ -63,10 +63,6 @@ for (int i = 1; i < keyLen; ++i) { \
|
||||||
|
|
||||||
#define TS "_ts"
|
#define TS "_ts"
|
||||||
#define TS_LEN 3
|
#define TS_LEN 3
|
||||||
#define TAG "_tag"
|
|
||||||
#define TAG_LEN 4
|
|
||||||
#define TAG_VALUE "NULL"
|
|
||||||
#define TAG_VALUE_LEN 4
|
|
||||||
#define VALUE "value"
|
#define VALUE "value"
|
||||||
#define VALUE_LEN 5
|
#define VALUE_LEN 5
|
||||||
|
|
||||||
|
@ -263,7 +259,7 @@ static int32_t smlBuildColumnDescription(SSmlKv* field, char* buf, int32_t bufSi
|
||||||
memcpy(tname, field->key, field->keyLen);
|
memcpy(tname, field->key, field->keyLen);
|
||||||
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
if (type == TSDB_DATA_TYPE_BINARY || type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t bytes = field->length > CHAR_SAVE_LENGTH ? (2*field->length) : CHAR_SAVE_LENGTH;
|
int32_t bytes = field->length > CHAR_SAVE_LENGTH ? (2*field->length) : CHAR_SAVE_LENGTH;
|
||||||
int out = snprintf(buf, bufSize,"`%s` %s(%d)",
|
int out = snprintf(buf, bufSize, "`%s` %s(%d)",
|
||||||
tname, tDataTypes[field->type].name, bytes);
|
tname, tDataTypes[field->type].name, bytes);
|
||||||
*outBytes = out;
|
*outBytes = out;
|
||||||
} else {
|
} else {
|
||||||
|
@ -400,6 +396,12 @@ static int32_t smlApplySchemaAction(SSmlHandle* info, SSchemaAction* action) {
|
||||||
pos += outBytes; freeBytes -= outBytes;
|
pos += outBytes; freeBytes -= outBytes;
|
||||||
*pos = ','; ++pos; --freeBytes;
|
*pos = ','; ++pos; --freeBytes;
|
||||||
}
|
}
|
||||||
|
if(taosArrayGetSize(cols) == 0){
|
||||||
|
outBytes = snprintf(pos, freeBytes,"`%s` %s(%d)",
|
||||||
|
tsSmlTagName, tDataTypes[TSDB_DATA_TYPE_NCHAR].name, CHAR_SAVE_LENGTH);
|
||||||
|
pos += outBytes; freeBytes -= outBytes;
|
||||||
|
*pos = ','; ++pos; --freeBytes;
|
||||||
|
}
|
||||||
pos--; ++freeBytes;
|
pos--; ++freeBytes;
|
||||||
outBytes = snprintf(pos, freeBytes, ")");
|
outBytes = snprintf(pos, freeBytes, ")");
|
||||||
TAOS_RES* res = taos_query(info->taos, result);
|
TAOS_RES* res = taos_query(info->taos, result);
|
||||||
|
@ -1112,14 +1114,6 @@ static int32_t smlParseTelnetString(SSmlHandle *info, const char* sql, SSmlTable
|
||||||
|
|
||||||
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
|
static int32_t smlParseCols(const char* data, int32_t len, SArray *cols, char *childTableName, bool isTag, SHashObj *dumplicateKey, SSmlMsgBuf *msg){
|
||||||
if(isTag && len == 0){
|
if(isTag && len == 0){
|
||||||
SSmlKv *kv = (SSmlKv *)taosMemoryCalloc(sizeof(SSmlKv), 1);
|
|
||||||
if(!kv) return TSDB_CODE_OUT_OF_MEMORY;
|
|
||||||
kv->key = TAG;
|
|
||||||
kv->keyLen = TAG_LEN;
|
|
||||||
kv->value = TAG_VALUE;
|
|
||||||
kv->length = TAG_VALUE_LEN;
|
|
||||||
kv->type = TSDB_DATA_TYPE_NCHAR;
|
|
||||||
if(cols) taosArrayPush(cols, &kv);
|
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -269,16 +269,7 @@ TEST(testCase, smlParseCols_tag_Test) {
|
||||||
ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf);
|
ret = smlParseCols(data, len, cols, NULL, true, dumplicateKey, &msgBuf);
|
||||||
ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
|
ASSERT_EQ(ret, TSDB_CODE_SUCCESS);
|
||||||
size = taosArrayGetSize(cols);
|
size = taosArrayGetSize(cols);
|
||||||
ASSERT_EQ(size, 1);
|
ASSERT_EQ(size, 0);
|
||||||
|
|
||||||
// nchar
|
|
||||||
kv = (SSmlKv *)taosArrayGetP(cols, 0);
|
|
||||||
ASSERT_EQ(strncasecmp(kv->key, TAG, TAG_LEN), 0);
|
|
||||||
ASSERT_EQ(kv->keyLen, TAG_LEN);
|
|
||||||
ASSERT_EQ(kv->type, TSDB_DATA_TYPE_NCHAR);
|
|
||||||
ASSERT_EQ(kv->length, TAG_LEN);
|
|
||||||
ASSERT_EQ(strncasecmp(kv->value, TAG_VALUE, TAG_VALUE_LEN), 0);
|
|
||||||
taosMemoryFree(kv);
|
|
||||||
|
|
||||||
taosArrayDestroy(cols);
|
taosArrayDestroy(cols);
|
||||||
taosHashCleanup(dumplicateKey);
|
taosHashCleanup(dumplicateKey);
|
||||||
|
@ -1207,7 +1198,8 @@ TEST(testCase, sml_TD15662_Test) {
|
||||||
ASSERT_NE(info, nullptr);
|
ASSERT_NE(info, nullptr);
|
||||||
|
|
||||||
const char *sql[] = {
|
const char *sql[] = {
|
||||||
"hetrey,id=sub_table_0123456,t0=f,t1=127i8,t2=32767i16,t3=2147483647i32,t4=9223372036854775807i64,t5=11.12345f32,t6=22.123456789f64,t7=\"binaryTagValue\",t8=L\"ncharTagValue\" c0=f,c1=127i8,c2=32767i16,c3=2147483647i32,c4=9223372036854775807i64,c5=11.12345f32,c6=22.123456789f64,c7=\"binaryColValue\",c8=L\"ncharColValue\",c9=7u64",
|
"hetrey c0=f,c1=127i8 1626006833639",
|
||||||
|
"hetrey,t1=r c0=f,c1=127i8 1626006833640",
|
||||||
};
|
};
|
||||||
int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0]));
|
int ret = smlProcess(info, (char **)sql, sizeof(sql) / sizeof(sql[0]));
|
||||||
ASSERT_EQ(ret, 0);
|
ASSERT_EQ(ret, 0);
|
||||||
|
|
|
@ -1077,7 +1077,7 @@ void tdResetKVRowBuilder(SKVRowBuilder *pBuilder) {
|
||||||
|
|
||||||
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
|
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
|
||||||
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
|
int tlen = sizeof(SColIdx) * pBuilder->nCols + pBuilder->size;
|
||||||
if (tlen == 0) return NULL;
|
// if (tlen == 0) return NULL; // nCols == 0 means no tags
|
||||||
|
|
||||||
tlen += TD_KV_ROW_HEAD_SIZE;
|
tlen += TD_KV_ROW_HEAD_SIZE;
|
||||||
|
|
||||||
|
@ -1087,8 +1087,10 @@ SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder) {
|
||||||
kvRowSetNCols(row, pBuilder->nCols);
|
kvRowSetNCols(row, pBuilder->nCols);
|
||||||
kvRowSetLen(row, tlen);
|
kvRowSetLen(row, tlen);
|
||||||
|
|
||||||
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
|
if(pBuilder->nCols > 0){
|
||||||
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
|
memcpy(kvRowColIdx(row), pBuilder->pColIdx, sizeof(SColIdx) * pBuilder->nCols);
|
||||||
|
memcpy(kvRowValues(row), pBuilder->buf, pBuilder->size);
|
||||||
|
}
|
||||||
|
|
||||||
return row;
|
return row;
|
||||||
}
|
}
|
||||||
|
|
|
@ -78,6 +78,7 @@ char tsTelemServer[TSDB_FQDN_LEN] = "telemetry.taosdata.com";
|
||||||
uint16_t tsTelemPort = 80;
|
uint16_t tsTelemPort = 80;
|
||||||
|
|
||||||
// schemaless
|
// schemaless
|
||||||
|
char tsSmlTagName[TSDB_COL_NAME_LEN] = "__tag";
|
||||||
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
|
char tsSmlChildTableName[TSDB_TABLE_NAME_LEN] = ""; //user defined child table name can be specified in tag value.
|
||||||
//If set to empty system will generate table name using MD5 hash.
|
//If set to empty system will generate table name using MD5 hash.
|
||||||
bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol)
|
bool tsSmlDataFormat = true; // true means that the name and order of cols in each line are the same(only for influx protocol)
|
||||||
|
@ -326,6 +327,7 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
|
||||||
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
|
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
|
||||||
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
|
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
|
||||||
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
|
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
|
||||||
|
if (cfgAddString(pCfg, "smlTagName", "", 1) != 0) return -1;
|
||||||
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
if (cfgAddBool(pCfg, "smlDataFormat", tsSmlDataFormat, 1) != 0) return -1;
|
||||||
|
|
||||||
tsNumOfTaskQueueThreads = tsNumOfCores / 4;
|
tsNumOfTaskQueueThreads = tsNumOfCores / 4;
|
||||||
|
@ -522,6 +524,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
|
||||||
}
|
}
|
||||||
|
|
||||||
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
tstrncpy(tsSmlChildTableName, cfgGetItem(pCfg, "smlChildTableName")->str, TSDB_TABLE_NAME_LEN);
|
||||||
|
tstrncpy(tsSmlTagName, cfgGetItem(pCfg, "smlTagName")->str, TSDB_COL_NAME_LEN);
|
||||||
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
tsSmlDataFormat = cfgGetItem(pCfg, "smlDataFormat")->bval;
|
||||||
|
|
||||||
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
|
||||||
|
|
|
@ -308,13 +308,10 @@ static int compareKv(const void* p1, const void* p2) {
|
||||||
* use stable name and tags to grearate child table name
|
* use stable name and tags to grearate child table name
|
||||||
*/
|
*/
|
||||||
void buildChildTableName(RandTableName* rName) {
|
void buildChildTableName(RandTableName* rName) {
|
||||||
int32_t size = taosArrayGetSize(rName->tags);
|
|
||||||
ASSERT(size > 0);
|
|
||||||
taosArraySort(rName->tags, compareKv);
|
|
||||||
|
|
||||||
SStringBuilder sb = {0};
|
SStringBuilder sb = {0};
|
||||||
taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen);
|
taosStringBuilderAppendStringLen(&sb, rName->sTableName, rName->sTableNameLen);
|
||||||
for (int j = 0; j < size; ++j) {
|
taosArraySort(rName->tags, compareKv);
|
||||||
|
for (int j = 0; j < taosArrayGetSize(rName->tags); ++j) {
|
||||||
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
|
SSmlKv* tagKv = taosArrayGetP(rName->tags, j);
|
||||||
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
|
taosStringBuilderAppendStringLen(&sb, tagKv->key, tagKv->keyLen);
|
||||||
if(IS_VAR_DATA_TYPE(tagKv->type)){
|
if(IS_VAR_DATA_TYPE(tagKv->type)){
|
||||||
|
|
|
@ -827,7 +827,7 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint
|
||||||
|
|
||||||
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
SKVRow row = tdGetKVRowFromBuilder(&pCxt->tagsBuilder);
|
||||||
if (NULL == row) {
|
if (NULL == row) {
|
||||||
return buildInvalidOperationMsg(&pCxt->msg, "tag value expected");
|
return buildInvalidOperationMsg(&pCxt->msg, "out of memory");
|
||||||
}
|
}
|
||||||
tdSortKVRowByColIdx(row);
|
tdSortKVRowByColIdx(row);
|
||||||
|
|
||||||
|
@ -1347,7 +1347,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tN
|
||||||
SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
|
SKVRow row = tdGetKVRowFromBuilder(&tagBuilder);
|
||||||
if (NULL == row) {
|
if (NULL == row) {
|
||||||
tdDestroyKVRowBuilder(&tagBuilder);
|
tdDestroyKVRowBuilder(&tagBuilder);
|
||||||
return buildInvalidOperationMsg(&pBuf, "tag value expected");
|
return buildInvalidOperationMsg(&pBuf, "out of memory");
|
||||||
}
|
}
|
||||||
tdSortKVRowByColIdx(row);
|
tdSortKVRowByColIdx(row);
|
||||||
|
|
||||||
|
@ -1696,7 +1696,7 @@ static int32_t smlBuildTagRow(SArray* cols, SKVRowBuilder* tagsBuilder, SParsedD
|
||||||
|
|
||||||
*row = tdGetKVRowFromBuilder(tagsBuilder);
|
*row = tdGetKVRowFromBuilder(tagsBuilder);
|
||||||
if (*row == NULL) {
|
if (*row == NULL) {
|
||||||
return TSDB_CODE_SML_INVALID_DATA;
|
return TSDB_CODE_OUT_OF_MEMORY;
|
||||||
}
|
}
|
||||||
tdSortKVRowByColIdx(*row);
|
tdSortKVRowByColIdx(*row);
|
||||||
return TSDB_CODE_SUCCESS;
|
return TSDB_CODE_SUCCESS;
|
||||||
|
|
Loading…
Reference in New Issue