Merge branch 'main' into merge/mainto3.0

This commit is contained in:
Shengliang Guan 2024-12-10 01:14:51 +00:00
commit 25ddd274f9
3 changed files with 144 additions and 128 deletions

View File

@ -28,13 +28,14 @@ In this document, it specifically refers to the internal levels of the second-le
- Default compression algorithms list and applicable range for each data type - Default compression algorithms list and applicable range for each data type
| Data Type | Available Encoding Algorithms | Default Encoding Algorithm | Available Compression Algorithms|Default Compression Algorithm| Default Compression Level| | Data Type |Available Encoding Algorithms | Default Encoding Algorithm | Available Compression Algorithms | Default Compression Algorithm | Default Compression Level |
| :-----------:|:----------:|:-------:|:-------:|:----------:|:----:| |:------------------------------------:|:----------------:|:-----------:|:--------------------:|:----:|:------:|
| tinyint/untinyint/smallint/usmallint/int/uint | simple8b| simple8b | lz4/zlib/zstd/xz| lz4 | medium| | int/uint | simple8b | simple8b | lz4/zlib/zstd/xz | lz4 | medium |
| bigint/ubigint/timestamp | simple8b/delta-i | delta-i |lz4/zlib/zstd/xz | lz4| medium| | tinyint/untinyint/smallint/usmallint | simple8b | simple8b | lz4/zlib/zstd/xz | zlib | medium |
|float/double | delta-d|delta-d |lz4/zlib/zstd/xz/tsz|lz4| medium| | bigint/ubigint/timestamp | simple8b/delta-i | delta-i | lz4/zlib/zstd/xz | lz4 | medium |
|binary/nchar| disabled| disabled|lz4/zlib/zstd/xz| lz4| medium| | float/double | delta-d | delta-d | lz4/zlib/zstd/xz/tsz | lz4 | medium |
|bool| bit-packing| bit-packing| lz4/zlib/zstd/xz| lz4| medium| | binary/nchar | disabled | disabled | lz4/zlib/zstd/xz | zstd | medium |
| bool | bit-packing | bit-packing | lz4/zlib/zstd/xz | zstd | medium |
## SQL Syntax ## SQL Syntax

View File

@ -29,14 +29,15 @@ description: 可配置压缩算法
- 各个数据类型的默认压缩算法列表和适用范围 - 各个数据类型的默认压缩算法列表和适用范围
| 数据类型 | 可选编码算法 | 编码算法默认值 | 可选压缩算法|压缩算法默认值| 压缩等级默认值| | 数据类型 | 可选编码算法 | 编码算法默认值 | 可选压缩算法 | 压缩算法默认值 |压缩等级默认值|
| :-----------:|:----------:|:-------:|:-------:|:----------:|:----:| |:------------------------------------:|:----------------:|:-----------:|:--------------------:|:----:|:------:|
| int/uint | simple8b| simple8b | lz4/zlib/zstd/xz| lz4 | medium| | int/uint | simple8b | simple8b | lz4/zlib/zstd/xz | lz4 | medium |
| tinyint/untinyint/smallint/usmallint | simple8b| simple8b | lz4/zlib/zstd/xz| zlib | medium| | tinyint/untinyint/smallint/usmallint | simple8b | simple8b | lz4/zlib/zstd/xz | zlib | medium |
| bigint/ubigint/timestamp | simple8b/delta-i | delta-i |lz4/zlib/zstd/xz | lz4| medium| | bigint/ubigint/timestamp | simple8b/delta-i | delta-i | lz4/zlib/zstd/xz | lz4 | medium |
|float/double | delta-d|delta-d |lz4/zlib/zstd/xz/tsz|lz4| medium| | float/double | delta-d | delta-d | lz4/zlib/zstd/xz/tsz | lz4 | medium |
|binary/nchar| disabled| disabled|lz4/zlib/zstd/xz| zstd| medium| | binary/nchar | disabled | disabled | lz4/zlib/zstd/xz | zstd | medium |
|bool| bit-packing| bit-packing| lz4/zlib/zstd/xz| zstd| medium| | bool | bit-packing | bit-packing | lz4/zlib/zstd/xz | zstd | medium |
## SQL 语法 ## SQL 语法

View File

@ -52,6 +52,22 @@
#define TMQ_META_VERSION "1.0" #define TMQ_META_VERSION "1.0"
static bool tmqAddJsonObjectItem(cJSON *object, const char *string, cJSON *item){
bool ret = cJSON_AddItemToObject(object, string, item);
if (!ret){
cJSON_Delete(item);
}
return ret;
}
static bool tmqAddJsonArrayItem(cJSON *array, cJSON *item){
bool ret = cJSON_AddItemToArray(array, item);
if (!ret){
cJSON_Delete(item);
}
return ret;
}
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
@ -68,41 +84,43 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
cJSON* type = cJSON_CreateString("create"); cJSON* type = cJSON_CreateString("create");
RAW_NULL_CHECK(type); RAW_NULL_CHECK(type);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
RAW_NULL_CHECK(tableType); RAW_NULL_CHECK(tableType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
cJSON* tableName = cJSON_CreateString(name); cJSON* tableName = cJSON_CreateString(name);
RAW_NULL_CHECK(tableName); RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
cJSON* columns = cJSON_CreateArray(); cJSON* columns = cJSON_CreateArray();
RAW_NULL_CHECK(columns); RAW_NULL_CHECK(columns);
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
for (int i = 0; i < schemaRow->nCols; i++) { for (int i = 0; i < schemaRow->nCols; i++) {
cJSON* column = cJSON_CreateObject(); cJSON* column = cJSON_CreateObject();
RAW_NULL_CHECK(column); RAW_NULL_CHECK(column);
RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
SSchema* s = schemaRow->pSchema + i; SSchema* s = schemaRow->pSchema + i;
cJSON* cname = cJSON_CreateString(s->name); cJSON* cname = cJSON_CreateString(s->name);
RAW_NULL_CHECK(cname); RAW_NULL_CHECK(cname);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "name", cname)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
cJSON* ctype = cJSON_CreateNumber(s->type); cJSON* ctype = cJSON_CreateNumber(s->type);
RAW_NULL_CHECK(ctype); RAW_NULL_CHECK(ctype);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "type", ctype)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) { if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = s->bytes - VARSTR_HEADER_SIZE; int32_t length = s->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
} else if (s->type == TSDB_DATA_TYPE_NCHAR) { } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
} }
cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY); cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
RAW_NULL_CHECK(isPk); RAW_NULL_CHECK(isPk);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "isPrimarykey", isPk)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
RAW_FALSE_CHECK(cJSON_AddItemToArray(columns, column));
if (pColCmprRow == NULL) { if (pColCmprRow == NULL) {
continue; continue;
@ -124,44 +142,44 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
cJSON* encodeJson = cJSON_CreateString(encode); cJSON* encodeJson = cJSON_CreateString(encode);
RAW_NULL_CHECK(encodeJson); RAW_NULL_CHECK(encodeJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "encode", encodeJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
cJSON* compressJson = cJSON_CreateString(compress); cJSON* compressJson = cJSON_CreateString(compress);
RAW_NULL_CHECK(compressJson); RAW_NULL_CHECK(compressJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "compress", compressJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
cJSON* levelJson = cJSON_CreateString(level); cJSON* levelJson = cJSON_CreateString(level);
RAW_NULL_CHECK(levelJson); RAW_NULL_CHECK(levelJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "level", levelJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
} }
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "columns", columns));
cJSON* tags = cJSON_CreateArray(); cJSON* tags = cJSON_CreateArray();
RAW_NULL_CHECK(tags); RAW_NULL_CHECK(tags);
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
for (int i = 0; schemaTag && i < schemaTag->nCols; i++) { for (int i = 0; schemaTag && i < schemaTag->nCols; i++) {
cJSON* tag = cJSON_CreateObject(); cJSON* tag = cJSON_CreateObject();
RAW_NULL_CHECK(tag); RAW_NULL_CHECK(tag);
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
SSchema* s = schemaTag->pSchema + i; SSchema* s = schemaTag->pSchema + i;
cJSON* tname = cJSON_CreateString(s->name); cJSON* tname = cJSON_CreateString(s->name);
RAW_NULL_CHECK(tname); RAW_NULL_CHECK(tname);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
cJSON* ttype = cJSON_CreateNumber(s->type); cJSON* ttype = cJSON_CreateNumber(s->type);
RAW_NULL_CHECK(ttype); RAW_NULL_CHECK(ttype);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) { if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = s->bytes - VARSTR_HEADER_SIZE; int32_t length = s->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
} else if (s->type == TSDB_DATA_TYPE_NCHAR) { } else if (s->type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
} }
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
} }
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
end: end:
*pJson = json; *pJson = json;
@ -175,7 +193,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
RAW_NULL_CHECK(encodeStr); RAW_NULL_CHECK(encodeStr);
cJSON* encodeJson = cJSON_CreateString(encodeStr); cJSON* encodeJson = cJSON_CreateString(encodeStr);
RAW_NULL_CHECK(encodeJson); RAW_NULL_CHECK(encodeJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "encode", encodeJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
return code; return code;
} }
uint8_t compress = COMPRESS_L2_TYPE_U32(para); uint8_t compress = COMPRESS_L2_TYPE_U32(para);
@ -184,7 +202,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
RAW_NULL_CHECK(compressStr); RAW_NULL_CHECK(compressStr);
cJSON* compressJson = cJSON_CreateString(compressStr); cJSON* compressJson = cJSON_CreateString(compressStr);
RAW_NULL_CHECK(compressJson); RAW_NULL_CHECK(compressJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "compress", compressJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
return code; return code;
} }
uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para); uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
@ -193,7 +211,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
RAW_NULL_CHECK(levelStr); RAW_NULL_CHECK(levelStr);
cJSON* levelJson = cJSON_CreateString(levelStr); cJSON* levelJson = cJSON_CreateString(levelStr);
RAW_NULL_CHECK(levelJson); RAW_NULL_CHECK(levelJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "level", levelJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
return code; return code;
} }
@ -214,19 +232,19 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
RAW_NULL_CHECK(json); RAW_NULL_CHECK(json);
cJSON* type = cJSON_CreateString("alter"); cJSON* type = cJSON_CreateString("alter");
RAW_NULL_CHECK(type); RAW_NULL_CHECK(type);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
SName name = {0}; SName name = {0};
RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)); RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
cJSON* tableType = cJSON_CreateString("super"); cJSON* tableType = cJSON_CreateString("super");
RAW_NULL_CHECK(tableType); RAW_NULL_CHECK(tableType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
cJSON* tableName = cJSON_CreateString(name.tname); cJSON* tableName = cJSON_CreateString(name.tname);
RAW_NULL_CHECK(tableName); RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
cJSON* alterType = cJSON_CreateNumber(req.alterType); cJSON* alterType = cJSON_CreateNumber(req.alterType);
RAW_NULL_CHECK(alterType); RAW_NULL_CHECK(alterType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
switch (req.alterType) { switch (req.alterType) {
case TSDB_ALTER_TABLE_ADD_TAG: case TSDB_ALTER_TABLE_ADD_TAG:
case TSDB_ALTER_TABLE_ADD_COLUMN: { case TSDB_ALTER_TABLE_ADD_COLUMN: {
@ -234,22 +252,22 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
RAW_NULL_CHECK(field); RAW_NULL_CHECK(field);
cJSON* colName = cJSON_CreateString(field->name); cJSON* colName = cJSON_CreateString(field->name);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colType = cJSON_CreateNumber(field->type); cJSON* colType = cJSON_CreateNumber(field->type);
RAW_NULL_CHECK(colType); RAW_NULL_CHECK(colType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
field->type == TSDB_DATA_TYPE_GEOMETRY) { field->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = field->bytes - VARSTR_HEADER_SIZE; int32_t length = field->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} else if (field->type == TSDB_DATA_TYPE_NCHAR) { } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} }
break; break;
} }
@ -258,22 +276,22 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
RAW_NULL_CHECK(field); RAW_NULL_CHECK(field);
cJSON* colName = cJSON_CreateString(field->name); cJSON* colName = cJSON_CreateString(field->name);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colType = cJSON_CreateNumber(field->type); cJSON* colType = cJSON_CreateNumber(field->type);
RAW_NULL_CHECK(colType); RAW_NULL_CHECK(colType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
field->type == TSDB_DATA_TYPE_GEOMETRY) { field->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = field->bytes - VARSTR_HEADER_SIZE; int32_t length = field->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} else if (field->type == TSDB_DATA_TYPE_NCHAR) { } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} }
RAW_RETURN_CHECK(setCompressOption(json, field->compress)); RAW_RETURN_CHECK(setCompressOption(json, field->compress));
break; break;
@ -284,7 +302,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
RAW_NULL_CHECK(field); RAW_NULL_CHECK(field);
cJSON* colName = cJSON_CreateString(field->name); cJSON* colName = cJSON_CreateString(field->name);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
@ -293,21 +311,21 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
RAW_NULL_CHECK(field); RAW_NULL_CHECK(field);
cJSON* colName = cJSON_CreateString(field->name); cJSON* colName = cJSON_CreateString(field->name);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colType = cJSON_CreateNumber(field->type); cJSON* colType = cJSON_CreateNumber(field->type);
RAW_NULL_CHECK(colType); RAW_NULL_CHECK(colType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
field->type == TSDB_DATA_TYPE_GEOMETRY) { field->type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = field->bytes - VARSTR_HEADER_SIZE; int32_t length = field->bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} else if (field->type == TSDB_DATA_TYPE_NCHAR) { } else if (field->type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} }
break; break;
} }
@ -319,10 +337,10 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
RAW_NULL_CHECK(newField); RAW_NULL_CHECK(newField);
cJSON* colName = cJSON_CreateString(oldField->name); cJSON* colName = cJSON_CreateString(oldField->name);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colNewName = cJSON_CreateString(newField->name); cJSON* colNewName = cJSON_CreateString(newField->name);
RAW_NULL_CHECK(colNewName); RAW_NULL_CHECK(colNewName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
@ -330,7 +348,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
RAW_NULL_CHECK(field); RAW_NULL_CHECK(field);
cJSON* colName = cJSON_CreateString(field->name); cJSON* colName = cJSON_CreateString(field->name);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
RAW_RETURN_CHECK(setCompressOption(json, field->bytes)); RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
break; break;
} }
@ -391,51 +409,47 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
int64_t id = pCreateReq->uid; int64_t id = pCreateReq->uid;
uint8_t tagNum = pCreateReq->ctb.tagNum; uint8_t tagNum = pCreateReq->ctb.tagNum;
int32_t code = 0; int32_t code = 0;
cJSON* tags = NULL; SArray* pTagVals = NULL;
char* pJson = NULL;
cJSON* tableName = cJSON_CreateString(name); cJSON* tableName = cJSON_CreateString(name);
RAW_NULL_CHECK(tableName); RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
cJSON* using = cJSON_CreateString(sname); cJSON* using = cJSON_CreateString(sname);
RAW_NULL_CHECK(using); RAW_NULL_CHECK(using);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "using", using)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
cJSON* tagNumJson = cJSON_CreateNumber(tagNum); cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
RAW_NULL_CHECK(tagNumJson); RAW_NULL_CHECK(tagNumJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tagNum", tagNumJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
tags = cJSON_CreateArray(); cJSON* tags = cJSON_CreateArray();
RAW_NULL_CHECK(tags); RAW_NULL_CHECK(tags);
SArray* pTagVals = NULL; RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals)); RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
if (tTagIsJson(pTag)) { if (tTagIsJson(pTag)) {
STag* p = (STag*)pTag; STag* p = (STag*)pTag;
if (p->nTag == 0) { if (p->nTag == 0) {
uError("p->nTag == 0"); uError("p->nTag == 0");
goto end; goto end;
} }
char* pJson = NULL;
parseTagDatatoJson(pTag, &pJson); parseTagDatatoJson(pTag, &pJson);
if (pJson == NULL) { RAW_NULL_CHECK(pJson);
uError("parseTagDatatoJson failed, pJson == NULL");
goto end;
}
cJSON* tag = cJSON_CreateObject(); cJSON* tag = cJSON_CreateObject();
RAW_NULL_CHECK(tag); RAW_NULL_CHECK(tag);
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
STagVal* pTagVal = taosArrayGet(pTagVals, 0); STagVal* pTagVal = taosArrayGet(pTagVals, 0);
RAW_NULL_CHECK(pTagVal); RAW_NULL_CHECK(pTagVal);
char* ptname = taosArrayGet(tagName, 0); char* ptname = taosArrayGet(tagName, 0);
RAW_NULL_CHECK(ptname); RAW_NULL_CHECK(ptname);
cJSON* tname = cJSON_CreateString(ptname); cJSON* tname = cJSON_CreateString(ptname);
RAW_NULL_CHECK(tname); RAW_NULL_CHECK(tname);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON); cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
RAW_NULL_CHECK(ttype); RAW_NULL_CHECK(ttype);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
cJSON* tvalue = cJSON_CreateString(pJson); cJSON* tvalue = cJSON_CreateString(pJson);
RAW_NULL_CHECK(tvalue); RAW_NULL_CHECK(tvalue);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
taosMemoryFree(pJson);
goto end; goto end;
} }
@ -444,36 +458,34 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
RAW_NULL_CHECK(pTagVal); RAW_NULL_CHECK(pTagVal);
cJSON* tag = cJSON_CreateObject(); cJSON* tag = cJSON_CreateObject();
RAW_NULL_CHECK(tag); RAW_NULL_CHECK(tag);
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
char* ptname = taosArrayGet(tagName, i); char* ptname = taosArrayGet(tagName, i);
RAW_NULL_CHECK(ptname); RAW_NULL_CHECK(ptname);
cJSON* tname = cJSON_CreateString(ptname); cJSON* tname = cJSON_CreateString(ptname);
RAW_NULL_CHECK(tname); RAW_NULL_CHECK(tname);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
cJSON* ttype = cJSON_CreateNumber(pTagVal->type); cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
RAW_NULL_CHECK(ttype); RAW_NULL_CHECK(ttype);
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
cJSON* tvalue = NULL; cJSON* tvalue = NULL;
if (IS_VAR_DATA_TYPE(pTagVal->type)) { if (IS_VAR_DATA_TYPE(pTagVal->type)) {
char* buf = NULL;
int64_t bufSize = 0; int64_t bufSize = 0;
if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) { if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) {
bufSize = pTagVal->nData * 2 + 2 + 3; bufSize = pTagVal->nData * 2 + 2 + 3;
} else { } else {
bufSize = pTagVal->nData + 3; bufSize = pTagVal->nData + 3;
} }
buf = taosMemoryCalloc(bufSize, 1); char* buf = taosMemoryCalloc(bufSize, 1);
RAW_NULL_CHECK(buf); RAW_NULL_CHECK(buf);
if (!buf) goto end;
if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) { if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf); taosMemoryFree(buf);
goto end; goto end;
} }
tvalue = cJSON_CreateString(buf); tvalue = cJSON_CreateString(buf);
RAW_NULL_CHECK(tvalue);
taosMemoryFree(buf); taosMemoryFree(buf);
RAW_NULL_CHECK(tvalue);
} else { } else {
double val = 0; double val = 0;
GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64); GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64);
@ -481,12 +493,11 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
RAW_NULL_CHECK(tvalue); RAW_NULL_CHECK(tvalue);
} }
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
} }
end: end:
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); taosMemoryFree(pJson);
taosArrayDestroy(pTagVals); taosArrayDestroy(pTagVals);
} }
@ -497,22 +508,23 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO
RAW_NULL_CHECK(json); RAW_NULL_CHECK(json);
cJSON* type = cJSON_CreateString("create"); cJSON* type = cJSON_CreateString("create");
RAW_NULL_CHECK(type); RAW_NULL_CHECK(type);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
cJSON* tableType = cJSON_CreateString("child"); cJSON* tableType = cJSON_CreateString("child");
RAW_NULL_CHECK(tableType); RAW_NULL_CHECK(tableType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
buildChildElement(json, pCreateReq); buildChildElement(json, pCreateReq);
cJSON* createList = cJSON_CreateArray(); cJSON* createList = cJSON_CreateArray();
RAW_NULL_CHECK(createList); RAW_NULL_CHECK(createList);
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
for (int i = 0; nReqs > 1 && i < nReqs; i++) { for (int i = 0; nReqs > 1 && i < nReqs; i++) {
cJSON* create = cJSON_CreateObject(); cJSON* create = cJSON_CreateObject();
RAW_NULL_CHECK(create); RAW_NULL_CHECK(create);
buildChildElement(create, pCreateReq + i); buildChildElement(create, pCreateReq + i);
RAW_FALSE_CHECK(cJSON_AddItemToArray(createList, create)); RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
} }
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList));
end: end:
*pJson = json; *pJson = json;
@ -619,62 +631,62 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
RAW_NULL_CHECK(json); RAW_NULL_CHECK(json);
cJSON* type = cJSON_CreateString("alter"); cJSON* type = cJSON_CreateString("alter");
RAW_NULL_CHECK(type); RAW_NULL_CHECK(type);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
? "child" ? "child"
: "normal"); : "normal");
RAW_NULL_CHECK(tableType); RAW_NULL_CHECK(tableType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
RAW_NULL_CHECK(tableName); RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action); cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
RAW_NULL_CHECK(alterType); RAW_NULL_CHECK(alterType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
switch (vAlterTbReq.action) { switch (vAlterTbReq.action) {
case TSDB_ALTER_TABLE_ADD_COLUMN: { case TSDB_ALTER_TABLE_ADD_COLUMN: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
RAW_NULL_CHECK(colType); RAW_NULL_CHECK(colType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) { vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) { } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} }
break; break;
} }
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: { case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
RAW_NULL_CHECK(colType); RAW_NULL_CHECK(colType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) { vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) { } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} }
RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress)); RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
break; break;
@ -682,43 +694,43 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
case TSDB_ALTER_TABLE_DROP_COLUMN: { case TSDB_ALTER_TABLE_DROP_COLUMN: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: { case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
RAW_NULL_CHECK(colType); RAW_NULL_CHECK(colType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY || if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) { vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE; int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) { } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
cJSON* cbytes = cJSON_CreateNumber(length); cJSON* cbytes = cJSON_CreateNumber(length);
RAW_NULL_CHECK(cbytes); RAW_NULL_CHECK(cbytes);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
} }
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: { case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName); cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
RAW_NULL_CHECK(colNewName); RAW_NULL_CHECK(colNewName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: { case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName); cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
RAW_NULL_CHECK(tagName); RAW_NULL_CHECK(tagName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", tagName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
bool isNull = vAlterTbReq.isNull; bool isNull = vAlterTbReq.isNull;
if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) { if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
@ -757,12 +769,12 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
cJSON* colValue = cJSON_CreateString(buf); cJSON* colValue = cJSON_CreateString(buf);
taosMemoryFree(buf); taosMemoryFree(buf);
RAW_NULL_CHECK(colValue); RAW_NULL_CHECK(colValue);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValue", colValue)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
} }
cJSON* isNullCJson = cJSON_CreateBool(isNull); cJSON* isNullCJson = cJSON_CreateBool(isNull);
RAW_NULL_CHECK(isNullCJson); RAW_NULL_CHECK(isNullCJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValueNull", isNullCJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: { case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
@ -774,14 +786,17 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
cJSON* tags = cJSON_CreateArray(); cJSON* tags = cJSON_CreateArray();
RAW_NULL_CHECK(tags); RAW_NULL_CHECK(tags);
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
for (int32_t i = 0; i < nTags; i++) { for (int32_t i = 0; i < nTags; i++) {
cJSON* member = cJSON_CreateObject(); cJSON* member = cJSON_CreateObject();
RAW_NULL_CHECK(member); RAW_NULL_CHECK(member);
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i); SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
cJSON* tagName = cJSON_CreateString(pTagVal->tagName); cJSON* tagName = cJSON_CreateString(pTagVal->tagName);
RAW_NULL_CHECK(tagName); RAW_NULL_CHECK(tagName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colName", tagName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) { if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
uError("processAlterTable isJson false"); uError("processAlterTable isJson false");
@ -789,14 +804,13 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
} }
bool isNull = pTagVal->isNull; bool isNull = pTagVal->isNull;
if (!isNull) { if (!isNull) {
char* buf = NULL;
int64_t bufSize = 0; int64_t bufSize = 0;
if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) { if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) {
bufSize = pTagVal->nTagVal * 2 + 2 + 3; bufSize = pTagVal->nTagVal * 2 + 2 + 3;
} else { } else {
bufSize = pTagVal->nTagVal + 3; bufSize = pTagVal->nTagVal + 3;
} }
buf = taosMemoryCalloc(bufSize, 1); char* buf = taosMemoryCalloc(bufSize, 1);
RAW_NULL_CHECK(buf); RAW_NULL_CHECK(buf);
if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) != if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) !=
TSDB_CODE_SUCCESS) { TSDB_CODE_SUCCESS) {
@ -806,21 +820,19 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
cJSON* colValue = cJSON_CreateString(buf); cJSON* colValue = cJSON_CreateString(buf);
taosMemoryFree(buf); taosMemoryFree(buf);
RAW_NULL_CHECK(colValue); RAW_NULL_CHECK(colValue);
RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValue", colValue)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
} }
cJSON* isNullCJson = cJSON_CreateBool(isNull); cJSON* isNullCJson = cJSON_CreateBool(isNull);
RAW_NULL_CHECK(isNullCJson); RAW_NULL_CHECK(isNullCJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValueNull", isNullCJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, member));
} }
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
break; break;
} }
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
RAW_NULL_CHECK(colName); RAW_NULL_CHECK(colName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress)); RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
break; break;
} }
@ -858,13 +870,13 @@ static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
RAW_NULL_CHECK(json); RAW_NULL_CHECK(json);
cJSON* type = cJSON_CreateString("drop"); cJSON* type = cJSON_CreateString("drop");
RAW_NULL_CHECK(type); RAW_NULL_CHECK(type);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
cJSON* tableType = cJSON_CreateString("super"); cJSON* tableType = cJSON_CreateString("super");
RAW_NULL_CHECK(tableType); RAW_NULL_CHECK(tableType);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
cJSON* tableName = cJSON_CreateString(req.name); cJSON* tableName = cJSON_CreateString(req.name);
RAW_NULL_CHECK(tableName); RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
end: end:
uDebug("processDropSTable return"); uDebug("processDropSTable return");
@ -897,10 +909,10 @@ static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
RAW_NULL_CHECK(json); RAW_NULL_CHECK(json);
cJSON* type = cJSON_CreateString("delete"); cJSON* type = cJSON_CreateString("delete");
RAW_NULL_CHECK(type); RAW_NULL_CHECK(type);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
cJSON* sqlJson = cJSON_CreateString(sql); cJSON* sqlJson = cJSON_CreateString(sql);
RAW_NULL_CHECK(sqlJson); RAW_NULL_CHECK(sqlJson);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", sqlJson)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
end: end:
uDebug("processDeleteTable return"); uDebug("processDeleteTable return");
@ -928,16 +940,17 @@ static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
RAW_NULL_CHECK(json); RAW_NULL_CHECK(json);
cJSON* type = cJSON_CreateString("drop"); cJSON* type = cJSON_CreateString("drop");
RAW_NULL_CHECK(type); RAW_NULL_CHECK(type);
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
cJSON* tableNameList = cJSON_CreateArray(); cJSON* tableNameList = cJSON_CreateArray();
RAW_NULL_CHECK(tableNameList); RAW_NULL_CHECK(tableNameList);
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
SVDropTbReq* pDropTbReq = req.pReqs + iReq; SVDropTbReq* pDropTbReq = req.pReqs + iReq;
cJSON* tableName = cJSON_CreateString(pDropTbReq->name); cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
RAW_NULL_CHECK(tableName); RAW_NULL_CHECK(tableName);
RAW_FALSE_CHECK(cJSON_AddItemToArray(tableNameList, tableName)); RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
} }
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableNameList", tableNameList));
end: end:
uDebug("processDropTable return"); uDebug("processDropTable return");
@ -2183,6 +2196,8 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION)); RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION));
cJSON* pMetaArr = cJSON_CreateArray(); cJSON* pMetaArr = cJSON_CreateArray();
RAW_NULL_CHECK(pMetaArr); RAW_NULL_CHECK(pMetaArr);
RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
int32_t num = taosArrayGetSize(rsp.batchMetaReq); int32_t num = taosArrayGetSize(rsp.batchMetaReq);
for (int32_t i = 0; i < num; i++) { for (int32_t i = 0; i < num; i++) {
int32_t* len = taosArrayGet(rsp.batchMetaLen, i); int32_t* len = taosArrayGet(rsp.batchMetaLen, i);
@ -2198,10 +2213,9 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
cJSON* pItem = NULL; cJSON* pItem = NULL;
processSimpleMeta(&metaRsp, &pItem); processSimpleMeta(&metaRsp, &pItem);
tDeleteMqMetaRsp(&metaRsp); tDeleteMqMetaRsp(&metaRsp);
RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem)); RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
} }
RAW_FALSE_CHECK(cJSON_AddItemToObject(pJson, "metas", pMetaArr));
tDeleteMqBatchMetaRsp(&rsp); tDeleteMqBatchMetaRsp(&rsp);
char* fullStr = cJSON_PrintUnformatted(pJson); char* fullStr = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson); cJSON_Delete(pJson);