From 3e9b303b0b26d50d11c26a5263c6243955576a82 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Dec 2024 09:30:29 +0800 Subject: [PATCH 1/4] fix:[TD-33057] memory leak --- source/client/src/clientRawBlockWrite.c | 211 +++++++++++++----------- 1 file changed, 113 insertions(+), 98 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 3a23d38375..987bed1cf5 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -52,6 +52,22 @@ #define TMQ_META_VERSION "1.0" +static bool tmqAddJsonObjectItem(cJSON *object, const char *string, cJSON *item){ + bool ret = cJSON_AddItemToObject(object, string, item); + if (!ret){ + cJSON_Delete(item); + } + return ret; +} +static bool tmqAddJsonArrayItem(cJSON *array, cJSON *item){ + bool ret = cJSON_AddItemToArray(array, item); + if (!ret){ + cJSON_Delete(item); + } + return ret; +} + + static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, @@ -68,13 +84,13 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche cJSON* type = cJSON_CreateString("create"); RAW_NULL_CHECK(type); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); RAW_NULL_CHECK(tableType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(name); RAW_NULL_CHECK(tableName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName)); cJSON* columns = cJSON_CreateArray(); RAW_NULL_CHECK(columns); @@ -84,25 +100,25 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche SSchema* s = schemaRow->pSchema + i; cJSON* cname = cJSON_CreateString(s->name); RAW_NULL_CHECK(cname); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "name", cname)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "name", cname)); cJSON* ctype = cJSON_CreateNumber(s->type); RAW_NULL_CHECK(ctype); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "type", ctype)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "type", ctype)); if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = s->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes)); } else if (s->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "length", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "length", cbytes)); } cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY); RAW_NULL_CHECK(isPk); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "isPrimarykey", isPk)); - RAW_FALSE_CHECK(cJSON_AddItemToArray(columns, column)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column)); if (pColCmprRow == NULL) { continue; @@ -124,17 +140,17 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche cJSON* encodeJson = cJSON_CreateString(encode); RAW_NULL_CHECK(encodeJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "encode", encodeJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "encode", encodeJson)); cJSON* compressJson = cJSON_CreateString(compress); RAW_NULL_CHECK(compressJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "compress", compressJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "compress", compressJson)); cJSON* levelJson = cJSON_CreateString(level); RAW_NULL_CHECK(levelJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(column, "level", levelJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson)); } - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "columns", columns)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns)); cJSON* tags = cJSON_CreateArray(); RAW_NULL_CHECK(tags); @@ -144,24 +160,24 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche SSchema* s = schemaTag->pSchema + i; cJSON* tname = cJSON_CreateString(s->name); RAW_NULL_CHECK(tname); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname)); cJSON* ttype = cJSON_CreateNumber(s->type); RAW_NULL_CHECK(ttype); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype)); if (s->type == TSDB_DATA_TYPE_BINARY || s->type == TSDB_DATA_TYPE_VARBINARY || s->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = s->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes)); } else if (s->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (s->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "length", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes)); } - RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); } - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); end: *pJson = json; @@ -175,7 +191,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) { RAW_NULL_CHECK(encodeStr); cJSON* encodeJson = cJSON_CreateString(encodeStr); RAW_NULL_CHECK(encodeJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "encode", encodeJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "encode", encodeJson)); return code; } uint8_t compress = COMPRESS_L2_TYPE_U32(para); @@ -184,7 +200,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) { RAW_NULL_CHECK(compressStr); cJSON* compressJson = cJSON_CreateString(compressStr); RAW_NULL_CHECK(compressJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "compress", compressJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "compress", compressJson)); return code; } uint8_t level = COMPRESS_L2_TYPE_LEVEL_U32(para); @@ -193,7 +209,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) { RAW_NULL_CHECK(levelStr); cJSON* levelJson = cJSON_CreateString(levelStr); RAW_NULL_CHECK(levelJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "level", levelJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "level", levelJson)); return code; } @@ -214,19 +230,19 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("alter"); RAW_NULL_CHECK(type); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); SName name = {0}; RAW_RETURN_CHECK(tNameFromString(&name, req.name, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE)); cJSON* tableType = cJSON_CreateString("super"); RAW_NULL_CHECK(tableType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(name.tname); RAW_NULL_CHECK(tableName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName)); cJSON* alterType = cJSON_CreateNumber(req.alterType); RAW_NULL_CHECK(alterType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType)); switch (req.alterType) { case TSDB_ALTER_TABLE_ADD_TAG: case TSDB_ALTER_TABLE_ADD_COLUMN: { @@ -234,22 +250,22 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(field->type); RAW_NULL_CHECK(colType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType)); if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = field->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } else if (field->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } break; } @@ -258,22 +274,22 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(field->type); RAW_NULL_CHECK(colType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType)); if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = field->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } else if (field->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } RAW_RETURN_CHECK(setCompressOption(json, field->compress)); break; @@ -284,7 +300,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); break; } case TSDB_ALTER_TABLE_UPDATE_TAG_BYTES: @@ -293,21 +309,21 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(field->type); RAW_NULL_CHECK(colType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType)); if (field->type == TSDB_DATA_TYPE_BINARY || field->type == TSDB_DATA_TYPE_VARBINARY || field->type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = field->bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } else if (field->type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (field->bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } break; } @@ -319,10 +335,10 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** RAW_NULL_CHECK(newField); cJSON* colName = cJSON_CreateString(oldField->name); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colNewName = cJSON_CreateString(newField->name); RAW_NULL_CHECK(colNewName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName)); break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { @@ -330,7 +346,7 @@ static void buildAlterSTableJson(void* alterData, int32_t alterDataLen, cJSON** RAW_NULL_CHECK(field); cJSON* colName = cJSON_CreateString(field->name); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); RAW_RETURN_CHECK(setCompressOption(json, field->bytes)); break; } @@ -392,21 +408,20 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { uint8_t tagNum = pCreateReq->ctb.tagNum; int32_t code = 0; cJSON* tags = NULL; + SArray* pTagVals = NULL; cJSON* tableName = cJSON_CreateString(name); RAW_NULL_CHECK(tableName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName)); cJSON* using = cJSON_CreateString(sname); RAW_NULL_CHECK(using); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "using", using)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "using", using)); cJSON* tagNumJson = cJSON_CreateNumber(tagNum); RAW_NULL_CHECK(tagNumJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tagNum", tagNumJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson)); tags = cJSON_CreateArray(); RAW_NULL_CHECK(tags); - SArray* pTagVals = NULL; RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals)); - if (tTagIsJson(pTag)) { STag* p = (STag*)pTag; if (p->nTag == 0) { @@ -427,14 +442,14 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { RAW_NULL_CHECK(ptname); cJSON* tname = cJSON_CreateString(ptname); RAW_NULL_CHECK(tname); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname)); cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON); RAW_NULL_CHECK(ttype); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype)); cJSON* tvalue = cJSON_CreateString(pJson); RAW_NULL_CHECK(tvalue); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue)); - RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); taosMemoryFree(pJson); goto end; } @@ -448,10 +463,10 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { RAW_NULL_CHECK(ptname); cJSON* tname = cJSON_CreateString(ptname); RAW_NULL_CHECK(tname); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "name", tname)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "name", tname)); cJSON* ttype = cJSON_CreateNumber(pTagVal->type); RAW_NULL_CHECK(ttype); - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "type", ttype)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "type", ttype)); cJSON* tvalue = NULL; if (IS_VAR_DATA_TYPE(pTagVal->type)) { @@ -481,12 +496,12 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { RAW_NULL_CHECK(tvalue); } - RAW_FALSE_CHECK(cJSON_AddItemToObject(tag, "value", tvalue)); - RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, tag)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); } + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); end: - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); taosArrayDestroy(pTagVals); } @@ -497,11 +512,11 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("create"); RAW_NULL_CHECK(type); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); cJSON* tableType = cJSON_CreateString("child"); RAW_NULL_CHECK(tableType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType)); buildChildElement(json, pCreateReq); cJSON* createList = cJSON_CreateArray(); @@ -510,9 +525,9 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO cJSON* create = cJSON_CreateObject(); RAW_NULL_CHECK(create); buildChildElement(create, pCreateReq + i); - RAW_FALSE_CHECK(cJSON_AddItemToArray(createList, create)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create)); } - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "createList", createList)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList)); end: *pJson = json; @@ -619,62 +634,62 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("alter"); RAW_NULL_CHECK(type); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); cJSON* tableType = cJSON_CreateString(vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_TAG_VAL || vAlterTbReq.action == TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL ? "child" : "normal"); RAW_NULL_CHECK(tableType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); RAW_NULL_CHECK(tableName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName)); cJSON* alterType = cJSON_CreateNumber(vAlterTbReq.action); RAW_NULL_CHECK(alterType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "alterType", alterType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "alterType", alterType)); switch (vAlterTbReq.action) { case TSDB_ALTER_TABLE_ADD_COLUMN: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); RAW_NULL_CHECK(colType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType)); if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } break; } case TSDB_ALTER_TABLE_ADD_COLUMN_WITH_COMPRESS_OPTION: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); RAW_NULL_CHECK(colType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType)); if (vAlterTbReq.type == TSDB_DATA_TYPE_BINARY || vAlterTbReq.type == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.type == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = vAlterTbReq.bytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } else if (vAlterTbReq.type == TSDB_DATA_TYPE_NCHAR) { int32_t length = (vAlterTbReq.bytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress)); break; @@ -682,43 +697,43 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { case TSDB_ALTER_TABLE_DROP_COLUMN: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colType = cJSON_CreateNumber(vAlterTbReq.colModType); RAW_NULL_CHECK(colType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colType", colType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colType", colType)); if (vAlterTbReq.colModType == TSDB_DATA_TYPE_BINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_VARBINARY || vAlterTbReq.colModType == TSDB_DATA_TYPE_GEOMETRY) { int32_t length = vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } else if (vAlterTbReq.colModType == TSDB_DATA_TYPE_NCHAR) { int32_t length = (vAlterTbReq.colModBytes - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE; cJSON* cbytes = cJSON_CreateNumber(length); RAW_NULL_CHECK(cbytes); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colLength", cbytes)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colLength", cbytes)); } break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName); RAW_NULL_CHECK(colNewName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colNewName", colNewName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colNewName", colNewName)); break; } case TSDB_ALTER_TABLE_UPDATE_TAG_VAL: { cJSON* tagName = cJSON_CreateString(vAlterTbReq.tagName); RAW_NULL_CHECK(tagName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", tagName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", tagName)); bool isNull = vAlterTbReq.isNull; if (vAlterTbReq.tagType == TSDB_DATA_TYPE_JSON) { @@ -757,12 +772,12 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { cJSON* colValue = cJSON_CreateString(buf); taosMemoryFree(buf); RAW_NULL_CHECK(colValue); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValue", colValue)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValue", colValue)); } cJSON* isNullCJson = cJSON_CreateBool(isNull); RAW_NULL_CHECK(isNullCJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colValueNull", isNullCJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colValueNull", isNullCJson)); break; } case TSDB_ALTER_TABLE_UPDATE_MULTI_TAG_VAL: { @@ -781,7 +796,7 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i); cJSON* tagName = cJSON_CreateString(pTagVal->tagName); RAW_NULL_CHECK(tagName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colName", tagName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colName", tagName)); if (pTagVal->tagType == TSDB_DATA_TYPE_JSON) { uError("processAlterTable isJson false"); @@ -806,21 +821,21 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { cJSON* colValue = cJSON_CreateString(buf); taosMemoryFree(buf); RAW_NULL_CHECK(colValue); - RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValue", colValue)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValue", colValue)); } cJSON* isNullCJson = cJSON_CreateBool(isNull); RAW_NULL_CHECK(isNullCJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(member, "colValueNull", isNullCJson)); - RAW_FALSE_CHECK(cJSON_AddItemToArray(tags, member)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member)); } - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tags", tags)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); break; } case TSDB_ALTER_TABLE_UPDATE_COLUMN_COMPRESS: { cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); RAW_NULL_CHECK(colName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "colName", colName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "colName", colName)); RAW_RETURN_CHECK(setCompressOption(json, vAlterTbReq.compress)); break; } @@ -858,13 +873,13 @@ static void processDropSTable(SMqMetaRsp* metaRsp, cJSON** pJson) { RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("drop"); RAW_NULL_CHECK(type); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); cJSON* tableType = cJSON_CreateString("super"); RAW_NULL_CHECK(tableType); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableType", tableType)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableType", tableType)); cJSON* tableName = cJSON_CreateString(req.name); RAW_NULL_CHECK(tableName); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableName", tableName)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName)); end: uDebug("processDropSTable return"); @@ -897,10 +912,10 @@ static void processDeleteTable(SMqMetaRsp* metaRsp, cJSON** pJson) { RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("delete"); RAW_NULL_CHECK(type); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); cJSON* sqlJson = cJSON_CreateString(sql); RAW_NULL_CHECK(sqlJson); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "sql", sqlJson)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "sql", sqlJson)); end: uDebug("processDeleteTable return"); @@ -928,16 +943,16 @@ static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) { RAW_NULL_CHECK(json); cJSON* type = cJSON_CreateString("drop"); RAW_NULL_CHECK(type); - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "type", type)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); cJSON* tableNameList = cJSON_CreateArray(); RAW_NULL_CHECK(tableNameList); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq* pDropTbReq = req.pReqs + iReq; cJSON* tableName = cJSON_CreateString(pDropTbReq->name); RAW_NULL_CHECK(tableName); - RAW_FALSE_CHECK(cJSON_AddItemToArray(tableNameList, tableName)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName)); } - RAW_FALSE_CHECK(cJSON_AddItemToObject(json, "tableNameList", tableNameList)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList)); end: uDebug("processDropTable return"); @@ -2198,10 +2213,10 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) { cJSON* pItem = NULL; processSimpleMeta(&metaRsp, &pItem); tDeleteMqMetaRsp(&metaRsp); - RAW_FALSE_CHECK(cJSON_AddItemToArray(pMetaArr, pItem)); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem)); } - RAW_FALSE_CHECK(cJSON_AddItemToObject(pJson, "metas", pMetaArr)); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr)); tDeleteMqBatchMetaRsp(&rsp); char* fullStr = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); From 325dcac84b36453a9722cafdc88ec24630f198c3 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 9 Dec 2024 17:23:02 +0800 Subject: [PATCH 2/4] fix:[TD-33057] memory leak --- source/client/src/clientRawBlockWrite.c | 53 ++++++++++++------------- 1 file changed, 26 insertions(+), 27 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 987bed1cf5..9ce2f726a5 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -94,9 +94,12 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche cJSON* columns = cJSON_CreateArray(); RAW_NULL_CHECK(columns); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns)); + for (int i = 0; i < schemaRow->nCols; i++) { cJSON* column = cJSON_CreateObject(); RAW_NULL_CHECK(column); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column)); SSchema* s = schemaRow->pSchema + i; cJSON* cname = cJSON_CreateString(s->name); RAW_NULL_CHECK(cname); @@ -118,7 +121,6 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche cJSON* isPk = cJSON_CreateBool(s->flags & COL_IS_KEY); RAW_NULL_CHECK(isPk); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "isPrimarykey", isPk)); - RAW_FALSE_CHECK(tmqAddJsonArrayItem(columns, column)); if (pColCmprRow == NULL) { continue; @@ -150,13 +152,15 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche RAW_NULL_CHECK(levelJson); RAW_FALSE_CHECK(tmqAddJsonObjectItem(column, "level", levelJson)); } - RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "columns", columns)); cJSON* tags = cJSON_CreateArray(); RAW_NULL_CHECK(tags); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); + for (int i = 0; schemaTag && i < schemaTag->nCols; i++) { cJSON* tag = cJSON_CreateObject(); RAW_NULL_CHECK(tag); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); SSchema* s = schemaTag->pSchema + i; cJSON* tname = cJSON_CreateString(s->name); RAW_NULL_CHECK(tname); @@ -175,9 +179,7 @@ static void buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sche RAW_NULL_CHECK(cbytes); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "length", cbytes)); } - RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); } - RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); end: *pJson = json; @@ -407,8 +409,9 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { int64_t id = pCreateReq->uid; uint8_t tagNum = pCreateReq->ctb.tagNum; int32_t code = 0; - cJSON* tags = NULL; SArray* pTagVals = NULL; + char* pJson = NULL; + cJSON* tableName = cJSON_CreateString(name); RAW_NULL_CHECK(tableName); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableName", tableName)); @@ -419,8 +422,9 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { RAW_NULL_CHECK(tagNumJson); RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tagNum", tagNumJson)); - tags = cJSON_CreateArray(); + cJSON* tags = cJSON_CreateArray(); RAW_NULL_CHECK(tags); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); RAW_RETURN_CHECK(tTagToValArray(pTag, &pTagVals)); if (tTagIsJson(pTag)) { STag* p = (STag*)pTag; @@ -428,14 +432,11 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { uError("p->nTag == 0"); goto end; } - char* pJson = NULL; parseTagDatatoJson(pTag, &pJson); - if (pJson == NULL) { - uError("parseTagDatatoJson failed, pJson == NULL"); - goto end; - } + RAW_NULL_CHECK(pJson); cJSON* tag = cJSON_CreateObject(); RAW_NULL_CHECK(tag); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); STagVal* pTagVal = taosArrayGet(pTagVals, 0); RAW_NULL_CHECK(pTagVal); char* ptname = taosArrayGet(tagName, 0); @@ -449,8 +450,6 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { cJSON* tvalue = cJSON_CreateString(pJson); RAW_NULL_CHECK(tvalue); RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue)); - RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); - taosMemoryFree(pJson); goto end; } @@ -459,6 +458,7 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { RAW_NULL_CHECK(pTagVal); cJSON* tag = cJSON_CreateObject(); RAW_NULL_CHECK(tag); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); char* ptname = taosArrayGet(tagName, i); RAW_NULL_CHECK(ptname); cJSON* tname = cJSON_CreateString(ptname); @@ -470,25 +470,22 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { cJSON* tvalue = NULL; if (IS_VAR_DATA_TYPE(pTagVal->type)) { - char* buf = NULL; int64_t bufSize = 0; if (pTagVal->type == TSDB_DATA_TYPE_VARBINARY) { bufSize = pTagVal->nData * 2 + 2 + 3; } else { bufSize = pTagVal->nData + 3; } - buf = taosMemoryCalloc(bufSize, 1); - + char* buf = taosMemoryCalloc(bufSize, 1); RAW_NULL_CHECK(buf); - if (!buf) goto end; if (dataConverToStr(buf, bufSize, pTagVal->type, pTagVal->pData, pTagVal->nData, NULL) != TSDB_CODE_SUCCESS) { taosMemoryFree(buf); goto end; } tvalue = cJSON_CreateString(buf); - RAW_NULL_CHECK(tvalue); taosMemoryFree(buf); + RAW_NULL_CHECK(tvalue); } else { double val = 0; GET_TYPED_DATA(val, double, pTagVal->type, &pTagVal->i64); @@ -497,11 +494,10 @@ static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { } RAW_FALSE_CHECK(tmqAddJsonObjectItem(tag, "value", tvalue)); - RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, tag)); } - RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); end: + taosMemoryFree(pJson); taosArrayDestroy(pTagVals); } @@ -521,13 +517,14 @@ static void buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs, cJSO buildChildElement(json, pCreateReq); cJSON* createList = cJSON_CreateArray(); RAW_NULL_CHECK(createList); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList)); + for (int i = 0; nReqs > 1 && i < nReqs; i++) { cJSON* create = cJSON_CreateObject(); RAW_NULL_CHECK(create); buildChildElement(create, pCreateReq + i); RAW_FALSE_CHECK(tmqAddJsonArrayItem(createList, create)); } - RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "createList", createList)); end: *pJson = json; @@ -789,9 +786,12 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { cJSON* tags = cJSON_CreateArray(); RAW_NULL_CHECK(tags); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); + for (int32_t i = 0; i < nTags; i++) { cJSON* member = cJSON_CreateObject(); RAW_NULL_CHECK(member); + RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member)); SMultiTagUpateVal* pTagVal = taosArrayGet(vAlterTbReq.pMultiTag, i); cJSON* tagName = cJSON_CreateString(pTagVal->tagName); @@ -804,14 +804,13 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { } bool isNull = pTagVal->isNull; if (!isNull) { - char* buf = NULL; int64_t bufSize = 0; if (pTagVal->tagType == TSDB_DATA_TYPE_VARBINARY) { bufSize = pTagVal->nTagVal * 2 + 2 + 3; } else { bufSize = pTagVal->nTagVal + 3; } - buf = taosMemoryCalloc(bufSize, 1); + char* buf = taosMemoryCalloc(bufSize, 1); RAW_NULL_CHECK(buf); if (dataConverToStr(buf, bufSize, pTagVal->tagType, pTagVal->pTagVal, pTagVal->nTagVal, NULL) != TSDB_CODE_SUCCESS) { @@ -826,9 +825,7 @@ static void processAlterTable(SMqMetaRsp* metaRsp, cJSON** pJson) { cJSON* isNullCJson = cJSON_CreateBool(isNull); RAW_NULL_CHECK(isNullCJson); RAW_FALSE_CHECK(tmqAddJsonObjectItem(member, "colValueNull", isNullCJson)); - RAW_FALSE_CHECK(tmqAddJsonArrayItem(tags, member)); } - RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tags", tags)); break; } @@ -946,13 +943,14 @@ static void processDropTable(SMqMetaRsp* metaRsp, cJSON** pJson) { RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "type", type)); cJSON* tableNameList = cJSON_CreateArray(); RAW_NULL_CHECK(tableNameList); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList)); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { SVDropTbReq* pDropTbReq = req.pReqs + iReq; cJSON* tableName = cJSON_CreateString(pDropTbReq->name); RAW_NULL_CHECK(tableName); RAW_FALSE_CHECK(tmqAddJsonArrayItem(tableNameList, tableName)); } - RAW_FALSE_CHECK(tmqAddJsonObjectItem(json, "tableNameList", tableNameList)); end: uDebug("processDropTable return"); @@ -2198,6 +2196,8 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) { RAW_FALSE_CHECK(cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION)); cJSON* pMetaArr = cJSON_CreateArray(); RAW_NULL_CHECK(pMetaArr); + RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr)); + int32_t num = taosArrayGetSize(rsp.batchMetaReq); for (int32_t i = 0; i < num; i++) { int32_t* len = taosArrayGet(rsp.batchMetaLen, i); @@ -2216,7 +2216,6 @@ static void processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp, char** string) { RAW_FALSE_CHECK(tmqAddJsonArrayItem(pMetaArr, pItem)); } - RAW_FALSE_CHECK(tmqAddJsonObjectItem(pJson, "metas", pMetaArr)); tDeleteMqBatchMetaRsp(&rsp); char* fullStr = cJSON_PrintUnformatted(pJson); cJSON_Delete(pJson); From 9b554aec8315dbfe31e1edc1696251143a3b96a2 Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 Dec 2024 19:10:29 +0800 Subject: [PATCH 3/4] doc: default compress algorith --- docs/en/14-reference/03-taos-sql/31-compress.md | 16 +++++++++------- docs/zh/14-reference/03-taos-sql/32-compress.md | 16 +++++++++------- 2 files changed, 18 insertions(+), 14 deletions(-) diff --git a/docs/en/14-reference/03-taos-sql/31-compress.md b/docs/en/14-reference/03-taos-sql/31-compress.md index 39abfe69bd..2ac269a2e9 100644 --- a/docs/en/14-reference/03-taos-sql/31-compress.md +++ b/docs/en/14-reference/03-taos-sql/31-compress.md @@ -28,13 +28,15 @@ In this article, it specifically refers to the level within the secondary compre - Default compression algorithm list and applicable range for each data type -| Data Type | Optional Encoding Algorithm | Default Encoding Algorithm | Optional Compression Algorithm|Default Compression Algorithm| Default Compression Level| -| :-----------:|:----------:|:-------:|:-------:|:----------:|:----:| -| tinyint/untinyint/smallint/usmallint/int/uint | simple8b| simple8b | lz4/zlib/zstd/xz| lz4 | medium| -| bigint/ubigint/timestamp | simple8b/delta-i | delta-i |lz4/zlib/zstd/xz | lz4| medium| -|float/double | delta-d|delta-d |lz4/zlib/zstd/xz/tsz|lz4| medium| -|binary/nchar| disabled| disabled|lz4/zlib/zstd/xz| lz4| medium| -|bool| bit-packing| bit-packing| lz4/zlib/zstd/xz| lz4| medium| +| Data Type |Available Encoding Algorithms | Default Encoding Algorithm | Available Compression Algorithms | Default Compression Algorithm | Default Compression Level | +|:------------------------------------:|:----------------:|:-----------:|:--------------------:|:----:|:------:| +| int/uint | simple8b | simple8b | lz4/zlib/zstd/xz | lz4 | medium | +| tinyint/untinyint/smallint/usmallint | simple8b | simple8b | lz4/zlib/zstd/xz | zlib | medium | +| bigint/ubigint/timestamp | simple8b/delta-i | delta-i | lz4/zlib/zstd/xz | lz4 | medium | +| float/double | delta-d | delta-d | lz4/zlib/zstd/xz/tsz | lz4 | medium | +| binary/nchar | disabled | disabled | lz4/zlib/zstd/xz | zstd | medium | +| bool | bit-packing | bit-packing | lz4/zlib/zstd/xz | zstd | medium | + ## SQL diff --git a/docs/zh/14-reference/03-taos-sql/32-compress.md b/docs/zh/14-reference/03-taos-sql/32-compress.md index 0f2b260832..2ef6e9e06a 100644 --- a/docs/zh/14-reference/03-taos-sql/32-compress.md +++ b/docs/zh/14-reference/03-taos-sql/32-compress.md @@ -29,13 +29,15 @@ description: 可配置压缩算法 - 各个数据类型的默认压缩算法列表和适用范围 -| 数据类型 | 可选编码算法 | 编码算法默认值 | 可选压缩算法|压缩算法默认值| 压缩等级默认值| -| :-----------:|:----------:|:-------:|:-------:|:----------:|:----:| -| tinyint/untinyint/smallint/usmallint/int/uint | simple8b| simple8b | lz4/zlib/zstd/xz| lz4 | medium| -| bigint/ubigint/timestamp | simple8b/delta-i | delta-i |lz4/zlib/zstd/xz | lz4| medium| -|float/double | delta-d|delta-d |lz4/zlib/zstd/xz/tsz|lz4| medium| -|binary/nchar| disabled| disabled|lz4/zlib/zstd/xz| lz4| medium| -|bool| bit-packing| bit-packing| lz4/zlib/zstd/xz| lz4| medium| +| 数据类型 | 可选编码算法 | 编码算法默认值 | 可选压缩算法 | 压缩算法默认值 |压缩等级默认值| +|:------------------------------------:|:----------------:|:-----------:|:--------------------:|:----:|:------:| +| int/uint | simple8b | simple8b | lz4/zlib/zstd/xz | lz4 | medium | +| tinyint/untinyint/smallint/usmallint | simple8b | simple8b | lz4/zlib/zstd/xz | zlib | medium | +| bigint/ubigint/timestamp | simple8b/delta-i | delta-i | lz4/zlib/zstd/xz | lz4 | medium | +| float/double | delta-d | delta-d | lz4/zlib/zstd/xz/tsz | lz4 | medium | +| binary/nchar | disabled | disabled | lz4/zlib/zstd/xz | zstd | medium | +| bool | bit-packing | bit-packing | lz4/zlib/zstd/xz | zstd | medium | + ## SQL 语法 From b8c32776c96fe6c0d149ddd2cac31ebfc77801fc Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Mon, 9 Dec 2024 19:12:11 +0800 Subject: [PATCH 4/4] doc: default compress algorithm --- docs/en/14-reference/03-taos-sql/32-compress.md | 15 ++++++++------- 1 file changed, 8 insertions(+), 7 deletions(-) diff --git a/docs/en/14-reference/03-taos-sql/32-compress.md b/docs/en/14-reference/03-taos-sql/32-compress.md index efe1959b1a..8bb2ae31e3 100644 --- a/docs/en/14-reference/03-taos-sql/32-compress.md +++ b/docs/en/14-reference/03-taos-sql/32-compress.md @@ -28,13 +28,14 @@ In this document, it specifically refers to the internal levels of the second-le - Default compression algorithms list and applicable range for each data type -| Data Type | Available Encoding Algorithms | Default Encoding Algorithm | Available Compression Algorithms|Default Compression Algorithm| Default Compression Level| -| :-----------:|:----------:|:-------:|:-------:|:----------:|:----:| -| tinyint/untinyint/smallint/usmallint/int/uint | simple8b| simple8b | lz4/zlib/zstd/xz| lz4 | medium| -| bigint/ubigint/timestamp | simple8b/delta-i | delta-i |lz4/zlib/zstd/xz | lz4| medium| -|float/double | delta-d|delta-d |lz4/zlib/zstd/xz/tsz|lz4| medium| -|binary/nchar| disabled| disabled|lz4/zlib/zstd/xz| lz4| medium| -|bool| bit-packing| bit-packing| lz4/zlib/zstd/xz| lz4| medium| +| Data Type |Available Encoding Algorithms | Default Encoding Algorithm | Available Compression Algorithms | Default Compression Algorithm | Default Compression Level | +|:------------------------------------:|:----------------:|:-----------:|:--------------------:|:----:|:------:| +| int/uint | simple8b | simple8b | lz4/zlib/zstd/xz | lz4 | medium | +| tinyint/untinyint/smallint/usmallint | simple8b | simple8b | lz4/zlib/zstd/xz | zlib | medium | +| bigint/ubigint/timestamp | simple8b/delta-i | delta-i | lz4/zlib/zstd/xz | lz4 | medium | +| float/double | delta-d | delta-d | lz4/zlib/zstd/xz/tsz | lz4 | medium | +| binary/nchar | disabled | disabled | lz4/zlib/zstd/xz | zstd | medium | +| bool | bit-packing | bit-packing | lz4/zlib/zstd/xz | zstd | medium | ## SQL Syntax