fix:[TD-33057] memory leak
This commit is contained in:
parent
2907f6a5a8
commit
3e9b303b0b
|
@ -52,6 +52,22 @@
|
||||||
|
|
||||||
#define TMQ_META_VERSION "1.0"
|
#define TMQ_META_VERSION "1.0"
|
||||||
|
|
||||||
|
static bool tmqAddJsonObjectItem(cJSON *object, const char *string, cJSON *item){
|
||||||
|
bool ret = cJSON_AddItemToObject(object, string, item);
|
||||||
|
if (!ret){
|
||||||
|
cJSON_Delete(item);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
static bool tmqAddJsonArrayItem(cJSON *array, cJSON *item){
|
||||||
|
bool ret = cJSON_AddItemToArray(array, item);
|
||||||
|
if (!ret){
|
||||||
|
cJSON_Delete(item);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
|
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
|
||||||
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
|
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
|
||||||
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
|
static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t,
|
||||||
|
@ -68,13 +84,13 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
|
||||||
cJSON* type = cJSON_CreateString("create");
|
cJSON* type = cJSON_CreateString("create");
|
||||||
RAW_NULL_CHECK(type);
|
RAW_NULL_CHECK(type);
|
||||||
|
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
|
||||||
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
|
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
|
||||||
RAW_NULL_CHECK(tableType);
|
RAW_NULL_CHECK(tableType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
|
||||||
cJSON* tableName = cJSON_CreateString(name);
|
cJSON* tableName = cJSON_CreateString(name);
|
||||||
RAW_NULL_CHECK(tableName);
|
RAW_NULL_CHECK(tableName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
|
||||||
|
|
||||||
cJSON* columns = cJSON_CreateArray();
|
cJSON* columns = cJSON_CreateArray();
|
||||||
RAW_NULL_CHECK(columns);
|
RAW_NULL_CHECK(columns);
|
||||||
|
@ -84,25 +100,25 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
|
||||||
SSchema* s = schemaRow->pSchema + i;
|
SSchema* s = schemaRow->pSchema + i;
|
||||||
cJSON* cname = cJSON_CreateString(s->name);
|
cJSON* cname = cJSON_CreateString(s->name);
|
||||||
RAW_NULL_CHECK(cname);
|
RAW_NULL_CHECK(cname);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "name", cname));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname));
|
||||||
cJSON* ctype = cJSON_CreateNumber(s->type);
|
cJSON* ctype = cJSON_CreateNumber(s->type);
|
||||||
RAW_NULL_CHECK(ctype);
|
RAW_NULL_CHECK(ctype);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "type", ctype));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype));
|
||||||
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
|
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
|
||||||
} else if (s->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (s->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes));
|
||||||
}
|
}
|
||||||
cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
|
cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY);
|
||||||
RAW_NULL_CHECK(isPk);
|
RAW_NULL_CHECK(isPk);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "isPrimarykey", isPk));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk));
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(columns, column));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column));
|
||||||
|
|
||||||
if (pColCmprRow == NULL) {
|
if (pColCmprRow == NULL) {
|
||||||
continue;
|
continue;
|
||||||
|
@ -124,17 +140,17 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
|
||||||
|
|
||||||
cJSON* encodeJson = cJSON_CreateString(encode);
|
cJSON* encodeJson = cJSON_CreateString(encode);
|
||||||
RAW_NULL_CHECK(encodeJson);
|
RAW_NULL_CHECK(encodeJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "encode", encodeJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson));
|
||||||
|
|
||||||
cJSON* compressJson = cJSON_CreateString(compress);
|
cJSON* compressJson = cJSON_CreateString(compress);
|
||||||
RAW_NULL_CHECK(compressJson);
|
RAW_NULL_CHECK(compressJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "compress", compressJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson));
|
||||||
|
|
||||||
cJSON* levelJson = cJSON_CreateString(level);
|
cJSON* levelJson = cJSON_CreateString(level);
|
||||||
RAW_NULL_CHECK(levelJson);
|
RAW_NULL_CHECK(levelJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "level", levelJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson));
|
||||||
}
|
}
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "columns", columns));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns));
|
||||||
|
|
||||||
cJSON* tags = cJSON_CreateArray();
|
cJSON* tags = cJSON_CreateArray();
|
||||||
RAW_NULL_CHECK(tags);
|
RAW_NULL_CHECK(tags);
|
||||||
|
@ -144,24 +160,24 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche
|
||||||
SSchema* s = schemaTag->pSchema + i;
|
SSchema* s = schemaTag->pSchema + i;
|
||||||
cJSON* tname = cJSON_CreateString(s->name);
|
cJSON* tname = cJSON_CreateString(s->name);
|
||||||
RAW_NULL_CHECK(tname);
|
RAW_NULL_CHECK(tname);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
|
||||||
cJSON* ttype = cJSON_CreateNumber(s->type);
|
cJSON* ttype = cJSON_CreateNumber(s->type);
|
||||||
RAW_NULL_CHECK(ttype);
|
RAW_NULL_CHECK(ttype);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
|
||||||
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
|
if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
int32_t length = s->bytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
|
||||||
} else if (s->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (s->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes));
|
||||||
}
|
}
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
|
||||||
}
|
}
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
|
||||||
|
|
||||||
end:
|
end:
|
||||||
*pJson = json;
|
*pJson = json;
|
||||||
|
@ -175,7 +191,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
|
||||||
RAW_NULL_CHECK(encodeStr);
|
RAW_NULL_CHECK(encodeStr);
|
||||||
cJSON* encodeJson = cJSON_CreateString(encodeStr);
|
cJSON* encodeJson = cJSON_CreateString(encodeStr);
|
||||||
RAW_NULL_CHECK(encodeJson);
|
RAW_NULL_CHECK(encodeJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "encode", encodeJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
uint8_t compress = COMPRESS_L2_TYPE_U32(para);
|
uint8_t compress = COMPRESS_L2_TYPE_U32(para);
|
||||||
|
@ -184,7 +200,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
|
||||||
RAW_NULL_CHECK(compressStr);
|
RAW_NULL_CHECK(compressStr);
|
||||||
cJSON* compressJson = cJSON_CreateString(compressStr);
|
cJSON* compressJson = cJSON_CreateString(compressStr);
|
||||||
RAW_NULL_CHECK(compressJson);
|
RAW_NULL_CHECK(compressJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "compress", compressJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
|
uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para);
|
||||||
|
@ -193,7 +209,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
|
||||||
RAW_NULL_CHECK(levelStr);
|
RAW_NULL_CHECK(levelStr);
|
||||||
cJSON* levelJson = cJSON_CreateString(levelStr);
|
cJSON* levelJson = cJSON_CreateString(levelStr);
|
||||||
RAW_NULL_CHECK(levelJson);
|
RAW_NULL_CHECK(levelJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "level", levelJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson));
|
||||||
return code;
|
return code;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -214,19 +230,19 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
||||||
RAW_NULL_CHECK(json);
|
RAW_NULL_CHECK(json);
|
||||||
cJSON* type = cJSON_CreateString("alter");
|
cJSON* type = cJSON_CreateString("alter");
|
||||||
RAW_NULL_CHECK(type);
|
RAW_NULL_CHECK(type);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
|
||||||
SName name = {0};
|
SName name = {0};
|
||||||
RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
|
RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE));
|
||||||
cJSON* tableType = cJSON_CreateString("super");
|
cJSON* tableType = cJSON_CreateString("super");
|
||||||
RAW_NULL_CHECK(tableType);
|
RAW_NULL_CHECK(tableType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
|
||||||
cJSON* tableName = cJSON_CreateString(name.tname);
|
cJSON* tableName = cJSON_CreateString(name.tname);
|
||||||
RAW_NULL_CHECK(tableName);
|
RAW_NULL_CHECK(tableName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
|
||||||
|
|
||||||
cJSON* alterType = cJSON_CreateNumber(req.alterType);
|
cJSON* alterType = cJSON_CreateNumber(req.alterType);
|
||||||
RAW_NULL_CHECK(alterType);
|
RAW_NULL_CHECK(alterType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
|
||||||
switch (req.alterType) {
|
switch (req.alterType) {
|
||||||
case TSDB_ALTER_TABLE_ADD_TAG:
|
case TSDB_ALTER_TABLE_ADD_TAG:
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN: {
|
case TSDB_ALTER_TABLE_ADD_COLUMN: {
|
||||||
|
@ -234,22 +250,22 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
||||||
RAW_NULL_CHECK(field);
|
RAW_NULL_CHECK(field);
|
||||||
cJSON* colName = cJSON_CreateString(field->name);
|
cJSON* colName = cJSON_CreateString(field->name);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colType = cJSON_CreateNumber(field->type);
|
cJSON* colType = cJSON_CreateNumber(field->type);
|
||||||
RAW_NULL_CHECK(colType);
|
RAW_NULL_CHECK(colType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
|
||||||
|
|
||||||
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||||
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
} else if (field->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (field->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -258,22 +274,22 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
||||||
RAW_NULL_CHECK(field);
|
RAW_NULL_CHECK(field);
|
||||||
cJSON* colName = cJSON_CreateString(field->name);
|
cJSON* colName = cJSON_CreateString(field->name);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colType = cJSON_CreateNumber(field->type);
|
cJSON* colType = cJSON_CreateNumber(field->type);
|
||||||
RAW_NULL_CHECK(colType);
|
RAW_NULL_CHECK(colType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
|
||||||
|
|
||||||
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||||
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
} else if (field->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (field->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
}
|
}
|
||||||
RAW_RETURN_CHECK(setCompressOption(json, field->compress));
|
RAW_RETURN_CHECK(setCompressOption(json, field->compress));
|
||||||
break;
|
break;
|
||||||
|
@ -284,7 +300,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
||||||
RAW_NULL_CHECK(field);
|
RAW_NULL_CHECK(field);
|
||||||
cJSON* colName = cJSON_CreateString(field->name);
|
cJSON* colName = cJSON_CreateString(field->name);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES:
|
||||||
|
@ -293,21 +309,21 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
||||||
RAW_NULL_CHECK(field);
|
RAW_NULL_CHECK(field);
|
||||||
cJSON* colName = cJSON_CreateString(field->name);
|
cJSON* colName = cJSON_CreateString(field->name);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colType = cJSON_CreateNumber(field->type);
|
cJSON* colType = cJSON_CreateNumber(field->type);
|
||||||
RAW_NULL_CHECK(colType);
|
RAW_NULL_CHECK(colType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
|
||||||
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY ||
|
||||||
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
field->type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
int32_t length = field->bytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
} else if (field->type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (field->type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -319,10 +335,10 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
||||||
RAW_NULL_CHECK(newField);
|
RAW_NULL_CHECK(newField);
|
||||||
cJSON* colName = cJSON_CreateString(oldField->name);
|
cJSON* colName = cJSON_CreateString(oldField->name);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colNewName = cJSON_CreateString(newField->name);
|
cJSON* colNewName = cJSON_CreateString(newField->name);
|
||||||
RAW_NULL_CHECK(colNewName);
|
RAW_NULL_CHECK(colNewName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
|
||||||
|
@ -330,7 +346,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON**
|
||||||
RAW_NULL_CHECK(field);
|
RAW_NULL_CHECK(field);
|
||||||
cJSON* colName = cJSON_CreateString(field->name);
|
cJSON* colName = cJSON_CreateString(field->name);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
|
RAW_RETURN_CHECK(setCompressOption(json, field->bytes));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -392,21 +408,20 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
|
||||||
uint8_t tagNum = pCreateReq->ctb.tagNum;
|
uint8_t tagNum = pCreateReq->ctb.tagNum;
|
||||||
int32_t code = 0;
|
int32_t code = 0;
|
||||||
cJSON* tags = NULL;
|
cJSON* tags = NULL;
|
||||||
|
SArray* pTagVals = NULL;
|
||||||
cJSON* tableName = cJSON_CreateString(name);
|
cJSON* tableName = cJSON_CreateString(name);
|
||||||
RAW_NULL_CHECK(tableName);
|
RAW_NULL_CHECK(tableName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
|
||||||
cJSON* using = cJSON_CreateString(sname);
|
cJSON* using = cJSON_CreateString(sname);
|
||||||
RAW_NULL_CHECK(using);
|
RAW_NULL_CHECK(using);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "using", using));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using));
|
||||||
cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
|
cJSON* tagNumJson = cJSON_CreateNumber(tagNum);
|
||||||
RAW_NULL_CHECK(tagNumJson);
|
RAW_NULL_CHECK(tagNumJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tagNum", tagNumJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson));
|
||||||
|
|
||||||
tags = cJSON_CreateArray();
|
tags = cJSON_CreateArray();
|
||||||
RAW_NULL_CHECK(tags);
|
RAW_NULL_CHECK(tags);
|
||||||
SArray* pTagVals = NULL;
|
|
||||||
RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
|
RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals));
|
||||||
|
|
||||||
if (tTagIsJson(pTag)) {
|
if (tTagIsJson(pTag)) {
|
||||||
STag* p = (STag*)pTag;
|
STag* p = (STag*)pTag;
|
||||||
if (p->nTag == 0) {
|
if (p->nTag == 0) {
|
||||||
|
@ -427,14 +442,14 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
|
||||||
RAW_NULL_CHECK(ptname);
|
RAW_NULL_CHECK(ptname);
|
||||||
cJSON* tname = cJSON_CreateString(ptname);
|
cJSON* tname = cJSON_CreateString(ptname);
|
||||||
RAW_NULL_CHECK(tname);
|
RAW_NULL_CHECK(tname);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
|
||||||
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
|
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
|
||||||
RAW_NULL_CHECK(ttype);
|
RAW_NULL_CHECK(ttype);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
|
||||||
cJSON* tvalue = cJSON_CreateString(pJson);
|
cJSON* tvalue = cJSON_CreateString(pJson);
|
||||||
RAW_NULL_CHECK(tvalue);
|
RAW_NULL_CHECK(tvalue);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
|
||||||
taosMemoryFree(pJson);
|
taosMemoryFree(pJson);
|
||||||
goto end;
|
goto end;
|
||||||
}
|
}
|
||||||
|
@ -448,10 +463,10 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
|
||||||
RAW_NULL_CHECK(ptname);
|
RAW_NULL_CHECK(ptname);
|
||||||
cJSON* tname = cJSON_CreateString(ptname);
|
cJSON* tname = cJSON_CreateString(ptname);
|
||||||
RAW_NULL_CHECK(tname);
|
RAW_NULL_CHECK(tname);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname));
|
||||||
cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
|
cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
|
||||||
RAW_NULL_CHECK(ttype);
|
RAW_NULL_CHECK(ttype);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype));
|
||||||
|
|
||||||
cJSON* tvalue = NULL;
|
cJSON* tvalue = NULL;
|
||||||
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
if (IS_VAR_DATA_TYPE(pTagVal->type)) {
|
||||||
|
@ -481,12 +496,12 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
|
||||||
RAW_NULL_CHECK(tvalue);
|
RAW_NULL_CHECK(tvalue);
|
||||||
}
|
}
|
||||||
|
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue));
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
|
||||||
end:
|
end:
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
|
|
||||||
taosArrayDestroy(pTagVals);
|
taosArrayDestroy(pTagVals);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -497,11 +512,11 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO
|
||||||
RAW_NULL_CHECK(json);
|
RAW_NULL_CHECK(json);
|
||||||
cJSON* type = cJSON_CreateString("create");
|
cJSON* type = cJSON_CreateString("create");
|
||||||
RAW_NULL_CHECK(type);
|
RAW_NULL_CHECK(type);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
|
||||||
|
|
||||||
cJSON* tableType = cJSON_CreateString("child");
|
cJSON* tableType = cJSON_CreateString("child");
|
||||||
RAW_NULL_CHECK(tableType);
|
RAW_NULL_CHECK(tableType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
|
||||||
|
|
||||||
buildChildElement(json, pCreateReq);
|
buildChildElement(json, pCreateReq);
|
||||||
cJSON* createList = cJSON_CreateArray();
|
cJSON* createList = cJSON_CreateArray();
|
||||||
|
@ -510,9 +525,9 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO
|
||||||
cJSON* create = cJSON_CreateObject();
|
cJSON* create = cJSON_CreateObject();
|
||||||
RAW_NULL_CHECK(create);
|
RAW_NULL_CHECK(create);
|
||||||
buildChildElement(create, pCreateReq + i);
|
buildChildElement(create, pCreateReq + i);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(createList, create));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create));
|
||||||
}
|
}
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList));
|
||||||
|
|
||||||
end:
|
end:
|
||||||
*pJson = json;
|
*pJson = json;
|
||||||
|
@ -619,62 +634,62 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
RAW_NULL_CHECK(json);
|
RAW_NULL_CHECK(json);
|
||||||
cJSON* type = cJSON_CreateString("alter");
|
cJSON* type = cJSON_CreateString("alter");
|
||||||
RAW_NULL_CHECK(type);
|
RAW_NULL_CHECK(type);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
|
||||||
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
|
cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL ||
|
||||||
vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
|
vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL
|
||||||
? "child"
|
? "child"
|
||||||
: "normal");
|
: "normal");
|
||||||
RAW_NULL_CHECK(tableType);
|
RAW_NULL_CHECK(tableType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
|
||||||
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
|
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
|
||||||
RAW_NULL_CHECK(tableName);
|
RAW_NULL_CHECK(tableName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
|
||||||
cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
|
cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action);
|
||||||
RAW_NULL_CHECK(alterType);
|
RAW_NULL_CHECK(alterType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType));
|
||||||
|
|
||||||
switch (vAlterTbReq.action) {
|
switch (vAlterTbReq.action) {
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN: {
|
case TSDB_ALTER_TABLE_ADD_COLUMN: {
|
||||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||||
RAW_NULL_CHECK(colType);
|
RAW_NULL_CHECK(colType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
|
||||||
|
|
||||||
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
|
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
|
||||||
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
|
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
} else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
|
case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: {
|
||||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||||
RAW_NULL_CHECK(colType);
|
RAW_NULL_CHECK(colType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
|
||||||
|
|
||||||
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
|
if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY ||
|
||||||
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
|
vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
} else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
|
} else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
}
|
}
|
||||||
RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
|
RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
|
||||||
break;
|
break;
|
||||||
|
@ -682,43 +697,43 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
case TSDB_ALTER_TABLE_DROP_COLUMN: {
|
case TSDB_ALTER_TABLE_DROP_COLUMN: {
|
||||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: {
|
||||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
|
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType);
|
||||||
RAW_NULL_CHECK(colType);
|
RAW_NULL_CHECK(colType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType));
|
||||||
if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
|
if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY ||
|
||||||
vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
|
vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) {
|
||||||
int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
|
int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
} else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
|
} else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) {
|
||||||
int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE;
|
||||||
cJSON* cbytes = cJSON_CreateNumber(length);
|
cJSON* cbytes = cJSON_CreateNumber(length);
|
||||||
RAW_NULL_CHECK(cbytes);
|
RAW_NULL_CHECK(cbytes);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: {
|
||||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
|
cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
|
||||||
RAW_NULL_CHECK(colNewName);
|
RAW_NULL_CHECK(colNewName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
|
case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: {
|
||||||
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
|
cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName);
|
||||||
RAW_NULL_CHECK(tagName);
|
RAW_NULL_CHECK(tagName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", tagName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName));
|
||||||
|
|
||||||
bool isNull = vAlterTbReq.isNull;
|
bool isNull = vAlterTbReq.isNull;
|
||||||
if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
|
if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) {
|
||||||
|
@ -757,12 +772,12 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
cJSON* colValue = cJSON_CreateString(buf);
|
cJSON* colValue = cJSON_CreateString(buf);
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
RAW_NULL_CHECK(colValue);
|
RAW_NULL_CHECK(colValue);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValue", colValue));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue));
|
||||||
}
|
}
|
||||||
|
|
||||||
cJSON* isNullCJson = cJSON_CreateBool(isNull);
|
cJSON* isNullCJson = cJSON_CreateBool(isNull);
|
||||||
RAW_NULL_CHECK(isNullCJson);
|
RAW_NULL_CHECK(isNullCJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValueNull", isNullCJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
|
case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: {
|
||||||
|
@ -781,7 +796,7 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
|
SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i);
|
||||||
cJSON* tagName = cJSON_CreateString(pTagVal->tagName);
|
cJSON* tagName = cJSON_CreateString(pTagVal->tagName);
|
||||||
RAW_NULL_CHECK(tagName);
|
RAW_NULL_CHECK(tagName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colName", tagName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName));
|
||||||
|
|
||||||
if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
|
if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) {
|
||||||
uError("processAlterTable isJson false");
|
uError("processAlterTable isJson false");
|
||||||
|
@ -806,21 +821,21 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
cJSON* colValue = cJSON_CreateString(buf);
|
cJSON* colValue = cJSON_CreateString(buf);
|
||||||
taosMemoryFree(buf);
|
taosMemoryFree(buf);
|
||||||
RAW_NULL_CHECK(colValue);
|
RAW_NULL_CHECK(colValue);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValue", colValue));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue));
|
||||||
}
|
}
|
||||||
cJSON* isNullCJson = cJSON_CreateBool(isNull);
|
cJSON* isNullCJson = cJSON_CreateBool(isNull);
|
||||||
RAW_NULL_CHECK(isNullCJson);
|
RAW_NULL_CHECK(isNullCJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValueNull", isNullCJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson));
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, member));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member));
|
||||||
}
|
}
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
|
case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: {
|
||||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||||
RAW_NULL_CHECK(colName);
|
RAW_NULL_CHECK(colName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName));
|
||||||
RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
|
RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
@ -858,13 +873,13 @@ static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
RAW_NULL_CHECK(json);
|
RAW_NULL_CHECK(json);
|
||||||
cJSON* type = cJSON_CreateString("drop");
|
cJSON* type = cJSON_CreateString("drop");
|
||||||
RAW_NULL_CHECK(type);
|
RAW_NULL_CHECK(type);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
|
||||||
cJSON* tableType = cJSON_CreateString("super");
|
cJSON* tableType = cJSON_CreateString("super");
|
||||||
RAW_NULL_CHECK(tableType);
|
RAW_NULL_CHECK(tableType);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType));
|
||||||
cJSON* tableName = cJSON_CreateString(req.name);
|
cJSON* tableName = cJSON_CreateString(req.name);
|
||||||
RAW_NULL_CHECK(tableName);
|
RAW_NULL_CHECK(tableName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName));
|
||||||
|
|
||||||
end:
|
end:
|
||||||
uDebug("processDropSTable return");
|
uDebug("processDropSTable return");
|
||||||
|
@ -897,10 +912,10 @@ static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
RAW_NULL_CHECK(json);
|
RAW_NULL_CHECK(json);
|
||||||
cJSON* type = cJSON_CreateString("delete");
|
cJSON* type = cJSON_CreateString("delete");
|
||||||
RAW_NULL_CHECK(type);
|
RAW_NULL_CHECK(type);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
|
||||||
cJSON* sqlJson = cJSON_CreateString(sql);
|
cJSON* sqlJson = cJSON_CreateString(sql);
|
||||||
RAW_NULL_CHECK(sqlJson);
|
RAW_NULL_CHECK(sqlJson);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", sqlJson));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson));
|
||||||
|
|
||||||
end:
|
end:
|
||||||
uDebug("processDeleteTable return");
|
uDebug("processDeleteTable return");
|
||||||
|
@ -928,16 +943,16 @@ static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) {
|
||||||
RAW_NULL_CHECK(json);
|
RAW_NULL_CHECK(json);
|
||||||
cJSON* type = cJSON_CreateString("drop");
|
cJSON* type = cJSON_CreateString("drop");
|
||||||
RAW_NULL_CHECK(type);
|
RAW_NULL_CHECK(type);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type));
|
||||||
cJSON* tableNameList = cJSON_CreateArray();
|
cJSON* tableNameList = cJSON_CreateArray();
|
||||||
RAW_NULL_CHECK(tableNameList);
|
RAW_NULL_CHECK(tableNameList);
|
||||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||||
SVDropTbReq* pDropTbReq = req.pReqs + iReq;
|
SVDropTbReq* pDropTbReq = req.pReqs + iReq;
|
||||||
cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
|
cJSON* tableName = cJSON_CreateString(pDropTbReq->name);
|
||||||
RAW_NULL_CHECK(tableName);
|
RAW_NULL_CHECK(tableName);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(tableNameList, tableName));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName));
|
||||||
}
|
}
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableNameList", tableNameList));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList));
|
||||||
|
|
||||||
end:
|
end:
|
||||||
uDebug("processDropTable return");
|
uDebug("processDropTable return");
|
||||||
|
@ -2198,10 +2213,10 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) {
|
||||||
cJSON* pItem = NULL;
|
cJSON* pItem = NULL;
|
||||||
processSimpleMeta(&metaRsp, &pItem);
|
processSimpleMeta(&metaRsp, &pItem);
|
||||||
tDeleteMqMetaRsp(&metaRsp);
|
tDeleteMqMetaRsp(&metaRsp);
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem));
|
RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem));
|
||||||
}
|
}
|
||||||
|
|
||||||
RAW_FALSE_CHECK(cJSON_AddItemToObject(pJson, "metas", pMetaArr));
|
RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr));
|
||||||
tDeleteMqBatchMetaRsp(&rsp);
|
tDeleteMqBatchMetaRsp(&rsp);
|
||||||
char* fullStr = cJSON_PrintUnformatted(pJson);
|
char* fullStr = cJSON_PrintUnformatted(pJson);
|
||||||
cJSON_Delete(pJson);
|
cJSON_Delete(pJson);
|
||||||
|
|
Loading…
Reference in New Issue