From d815bb117be31fcd233a1c204fb3827b0a9dbd31 Mon Sep 17 00:00:00 2001 From: 54liuyao <54liuyao> Date: Tue, 28 May 2024 16:56:23 +0800 Subject: [PATCH] add json for batch meta --- include/client/taos.h | 3 +- source/client/src/clientRawBlockWrite.c | 127 +++++++++++------------- utils/test/c/tmq_taosx_ci.c | 20 +++- 3 files changed, 75 insertions(+), 75 deletions(-) diff --git a/include/client/taos.h b/include/client/taos.h index 66b5b92869..46e4e7633b 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -282,8 +282,7 @@ typedef enum tmq_res_t { TMQ_RES_INVALID = -1, TMQ_RES_DATA = 1, TMQ_RES_TABLE_META = 2, - TMQ_RES_METADATA = 3, - TMQ_RES_BATCH_TABLE_META = 4, + TMQ_RES_METADATA = 3 } tmq_res_t; typedef struct tmq_topic_assignment { diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index d5f478d188..5e6d693562 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -26,11 +26,13 @@ #define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64 #define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId +#define TMQ_META_VERSION "1.0" + static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen); static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); } -static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, +static cJSON* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t, SColCmprWrapper* pColCmprRow) { int8_t buildDefaultCompress = 0; if (pColCmprRow->nCols <= 0) { @@ -124,9 +126,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch } cJSON_AddItemToObject(json, "tags", tags); - string = cJSON_PrintUnformatted(json); - cJSON_Delete(json); - return string; + return json; } static int32_t setCompressOption(cJSON* json, uint32_t para) { @@ -153,7 +153,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) { } return 0; } -static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { +static cJSON* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { SMAlterStbReq req = {0}; cJSON* json = NULL; char* string = NULL; @@ -247,18 +247,16 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) { default: break; } - string = cJSON_PrintUnformatted(json); end: - cJSON_Delete(json); tFreeSMAltertbReq(&req); - return string; + return json; } -static char* processCreateStb(SMqMetaRsp* metaRsp) { +static cJSON* processCreateStb(SMqMetaRsp* metaRsp) { SVCreateStbReq req = {0}; SDecoder coder; - char* string = NULL; + cJSON* pJson = NULL; uDebug("create stable data:%p", metaRsp); // decode and process req @@ -269,17 +267,17 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) { goto _err; } - string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr); + pJson = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr); _err: - uDebug("create stable return, sql json:%s", string); + uDebug("create stable return, sql json:%s", cJSON_PrintUnformatted(pJson)); tDecoderClear(&coder); - return string; + return pJson; } -static char* processAlterStb(SMqMetaRsp* metaRsp) { +static cJSON* processAlterStb(SMqMetaRsp* metaRsp) { SVCreateStbReq req = {0}; SDecoder coder; - char* string = NULL; + cJSON* pJson = NULL; uDebug("alter stable data:%p", metaRsp); // decode and process req @@ -290,11 +288,11 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) { if (tDecodeSVCreateStbReq(&coder, &req) < 0) { goto _err; } - string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); + pJson = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen); _err: - uDebug("alter stable return, sql json:%s", string); + uDebug("alter stable return, sql json:%s", cJSON_PrintUnformatted(pJson)); tDecoderClear(&coder); - return string; + return pJson; } static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) { @@ -387,7 +385,7 @@ end: taosArrayDestroy(pTagVals); } -static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { +static cJSON* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { char* string = NULL; cJSON* json = cJSON_CreateObject(); if (json == NULL) { @@ -412,16 +410,14 @@ static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) { cJSON_AddItemToArray(createList, create); } cJSON_AddItemToObject(json, "createList", createList); - string = cJSON_PrintUnformatted(json); - cJSON_Delete(json); - return string; + return json; } -static char* processCreateTable(SMqMetaRsp* metaRsp) { +static cJSON* processCreateTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVCreateTbBatchReq req = {0}; SVCreateTbReq* pCreateReq; - char* string = NULL; + cJSON* pJson = NULL; // decode uDebug("create table data:%p", metaRsp); void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); @@ -435,15 +431,15 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { if (req.nReqs > 0) { pCreateReq = req.pReqs; if (pCreateReq->type == TSDB_CHILD_TABLE) { - string = buildCreateCTableJson(req.pReqs, req.nReqs); + pJson = buildCreateCTableJson(req.pReqs, req.nReqs); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { - string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, + pJson = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE, &pCreateReq->colCmpr); } } _exit: - uDebug("create table return, sql json:%s", string); + uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(pJson)); for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { pCreateReq = req.pReqs + iReq; taosMemoryFreeClear(pCreateReq->comment); @@ -453,7 +449,7 @@ _exit: } } tDecoderClear(&decoder); - return string; + return pJson; } static char* processAutoCreateTable(STaosxRsp* rsp) { @@ -482,7 +478,9 @@ static char* processAutoCreateTable(STaosxRsp* rsp) { goto _exit; } } - string = buildCreateCTableJson(pCreateReq, rsp->createTableNum); + cJSON* pJson = buildCreateCTableJson(pCreateReq, rsp->createTableNum); + string = cJSON_PrintUnformatted(pJson); + cJSON_Delete(pJson); _exit: uDebug("auto created table return, sql json:%s", string); for (int i = 0; i < rsp->createTableNum; i++) { @@ -497,7 +495,7 @@ _exit: return string; } -static char* processAlterTable(SMqMetaRsp* metaRsp) { +static cJSON* processAlterTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVAlterTbReq vAlterTbReq = {0}; char* string = NULL; @@ -622,19 +620,16 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) { default: break; } - string = cJSON_PrintUnformatted(json); _exit: - uDebug("alter table return, sql json:%s", string); - cJSON_Delete(json); + uDebug("alter table return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return string; + return json; } -static char* processDropSTable(SMqMetaRsp* metaRsp) { +static cJSON* processDropSTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVDropStbReq req = {0}; - char* string = NULL; cJSON* json = NULL; uDebug("processDropSTable data:%p", metaRsp); @@ -659,18 +654,15 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) { cJSON* tableName = cJSON_CreateString(req.name); cJSON_AddItemToObject(json, "tableName", tableName); - string = cJSON_PrintUnformatted(json); _exit: - uDebug("processDropSTable return, sql json:%s", string); - cJSON_Delete(json); + uDebug("processDropSTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return string; + return json; } -static char* processDeleteTable(SMqMetaRsp* metaRsp) { +static cJSON* processDeleteTable(SMqMetaRsp* metaRsp) { SDeleteRes req = {0}; SDecoder coder = {0}; cJSON* json = NULL; - char* string = NULL; uDebug("processDeleteTable data:%p", metaRsp); // decode and process req @@ -698,18 +690,15 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) { cJSON* sqlJson = cJSON_CreateString(sql); cJSON_AddItemToObject(json, "sql", sqlJson); - string = cJSON_PrintUnformatted(json); _exit: - uDebug("processDeleteTable return, sql json:%s", string); - cJSON_Delete(json); + uDebug("processDeleteTable return, sql json:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&coder); - return string; + return json; } -static char* processDropTable(SMqMetaRsp* metaRsp) { +static cJSON* processDropTable(SMqMetaRsp* metaRsp) { SDecoder decoder = {0}; SVDropTbBatchReq req = {0}; - char* string = NULL; cJSON* json = NULL; uDebug("processDropTable data:%p", metaRsp); @@ -743,12 +732,10 @@ static char* processDropTable(SMqMetaRsp* metaRsp) { } cJSON_AddItemToObject(json, "tableNameList", tableNameList); - string = cJSON_PrintUnformatted(json); _exit: - uDebug("processDropTable return, json sql:%s", string); - cJSON_Delete(json); + uDebug("processDropTable return, json sql:%s", cJSON_PrintUnformatted(json)); tDecoderClear(&decoder); - return string; + return json; } static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) { @@ -1805,6 +1792,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) { char err[ERR_MSG_LEN] = {0}; code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN); taosMemoryFree(fields); + taosMemoryFreeClear(pTableMeta); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", tbName, err); goto end; @@ -1997,6 +1985,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen) char err[ERR_MSG_LEN] = {0}; code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN); taosMemoryFree(fields); + taosMemoryFreeClear(pTableMeta); if (code != TSDB_CODE_SUCCESS) { SET_ERROR_MSG("table:%s, err:%s", tbName, err); goto end; @@ -2027,7 +2016,7 @@ end: return code; } -static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) { +static cJSON* processSimpleMeta(SMqMetaRsp* pMetaRsp) { if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) { return processCreateStb(pMetaRsp); } else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) { @@ -2051,15 +2040,15 @@ static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) { static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { SDecoder coder; SMqBatchMetaRsp rsp = {0}; - SArray* strArray = NULL; tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen); if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) { goto _end; } - int64_t fullSize = 0; + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION); + cJSON* pMetaArr = cJSON_CreateArray(); int32_t num = taosArrayGetSize(rsp.batchMetaReq); - strArray = taosArrayInit(num, POINTER_BYTES); for (int32_t i = 0; i < num; i++) { int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i); void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i); @@ -2069,26 +2058,19 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) { if(tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0 ) { goto _end; } - char* subStr = processSimpleMeta(&metaRsp); + cJSON* pItem = processSimpleMeta(&metaRsp); tDeleteMqMetaRsp(&metaRsp); - fullSize += strlen(subStr); - taosArrayPush(strArray, &subStr); + cJSON_AddItemToArray(pMetaArr, pItem); } - char* buf = (char*)taosMemoryCalloc(1, fullSize + num + 1); - for (int32_t i = 0; i < num; i++) { - char* subStr = taosArrayGetP(strArray, i); - strcat(buf, subStr); - if (i < num - 1) { - strcat(buf, "\n"); - } - } - taosArrayDestroyP(strArray, taosMemoryFree); + cJSON_AddItemToObject(pJson, "metas", pMetaArr); tDeleteMqBatchMetaRsp(&rsp); - return buf; + char* fullStr = cJSON_PrintUnformatted(pJson); + cJSON_Delete(pJson); + return fullStr; _end: - taosArrayDestroyP(strArray, taosMemoryFree); + cJSON_Delete(pJson); tDeleteMqBatchMetaRsp(&rsp); return NULL; } @@ -2109,7 +2091,10 @@ char* tmq_get_json_meta(TAOS_RES* res) { } SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; - return processSimpleMeta(&pMetaRspObj->metaRsp); + cJSON* pJson = processSimpleMeta(&pMetaRspObj->metaRsp); + char* string = cJSON_PrintUnformatted(pJson); + cJSON_Delete(pJson); + return string; } void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); } diff --git a/utils/test/c/tmq_taosx_ci.c b/utils/test/c/tmq_taosx_ci.c index ef2d70f54f..c1b312335f 100644 --- a/utils/test/c/tmq_taosx_ci.c +++ b/utils/test/c/tmq_taosx_ci.c @@ -18,6 +18,7 @@ #include #include #include +#include "cJSON.h" #include "taos.h" #include "tmsg.h" #include "types.h" @@ -61,8 +62,23 @@ static void msg_process(TAOS_RES* msg) { if (result) { printf("meta result: %s\n", result); if (g_fp && strcmp(result, "") != 0) { - taosFprintfFile(g_fp, result); - taosFprintfFile(g_fp, "\n"); + // RES_TYPE__TMQ_BATCH_META + if ((*(int8_t*)msg) == 5) { + cJSON* pJson = cJSON_Parse(result); + cJSON* pJsonArray = cJSON_GetObjectItem(pJson, "metas"); + int32_t num = cJSON_GetArraySize(pJsonArray); + for (int32_t i = 0; i < num; i++) { + cJSON* pJsonItem = cJSON_GetArrayItem(pJsonArray, i); + char* itemStr = cJSON_PrintUnformatted(pJsonItem); + taosFprintfFile(g_fp, itemStr); + tmq_free_json_meta(itemStr); + taosFprintfFile(g_fp, "\n"); + } + cJSON_Delete(pJson); + } else { + taosFprintfFile(g_fp, result); + taosFprintfFile(g_fp, "\n"); + } } } tmq_free_json_meta(result);