add json for batch meta

This commit is contained in:
54liuyao 2024-05-28 16:56:23 +08:00
parent 57bf9d1048
commit d815bb117b
3 changed files with 75 additions and 75 deletions

View File

@ -282,8 +282,7 @@ typedef enum tmq_res_t {
TMQ_RES_INVALID = -1,
TMQ_RES_DATA = 1,
TMQ_RES_TABLE_META = 2,
TMQ_RES_METADATA = 3,
TMQ_RES_BATCH_TABLE_META = 4,
TMQ_RES_METADATA = 3
} tmq_res_t;
typedef struct tmq_topic_assignment {

View File

@ -26,11 +26,13 @@
#define LOG_ID_TAG "connId:0x%" PRIx64 ",reqId:0x%" PRIx64
#define LOG_ID_VALUE *(int64_t*)taos, pRequest->requestId
#define TMQ_META_VERSION "1.0"
static int32_t tmqWriteBatchMetaDataImpl(TAOS* taos, void* meta, int32_t metaLen);
static tb_uid_t processSuid(tb_uid_t suid, char* db) { return suid + MurmurHash3_32(db, strlen(db)); }
static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
static cJSON* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id,
int8_t t, SColCmprWrapper* pColCmprRow) {
int8_t buildDefaultCompress = 0;
if (pColCmprRow->nCols <= 0) {
@ -124,9 +126,7 @@ static char* buildCreateTableJson(SSchemaWrapper* schemaRow, SSchemaWrapper* sch
}
cJSON_AddItemToObject(json, "tags", tags);
string = cJSON_PrintUnformatted(json);
cJSON_Delete(json);
return string;
return json;
}
static int32_t setCompressOption(cJSON* json, uint32_t para) {
@ -153,7 +153,7 @@ static int32_t setCompressOption(cJSON* json, uint32_t para) {
}
return 0;
}
static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
static cJSON* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
SMAlterStbReq req = {0};
cJSON* json = NULL;
char* string = NULL;
@ -247,18 +247,16 @@ static char* buildAlterSTableJson(void* alterData, int32_t alterDataLen) {
default:
break;
}
string = cJSON_PrintUnformatted(json);
end:
cJSON_Delete(json);
tFreeSMAltertbReq(&req);
return string;
return json;
}
static char* processCreateStb(SMqMetaRsp* metaRsp) {
static cJSON* processCreateStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0};
SDecoder coder;
char* string = NULL;
cJSON* pJson = NULL;
uDebug("create stable data:%p", metaRsp);
// decode and process req
@ -269,17 +267,17 @@ static char* processCreateStb(SMqMetaRsp* metaRsp) {
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto _err;
}
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr);
pJson = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE, &req.colCmpr);
_err:
uDebug("create stable return, sql json:%s", string);
uDebug("create stable return, sql json:%s", cJSON_PrintUnformatted(pJson));
tDecoderClear(&coder);
return string;
return pJson;
}
static char* processAlterStb(SMqMetaRsp* metaRsp) {
static cJSON* processAlterStb(SMqMetaRsp* metaRsp) {
SVCreateStbReq req = {0};
SDecoder coder;
char* string = NULL;
cJSON* pJson = NULL;
uDebug("alter stable data:%p", metaRsp);
// decode and process req
@ -290,11 +288,11 @@ static char* processAlterStb(SMqMetaRsp* metaRsp) {
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
goto _err;
}
string = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
pJson = buildAlterSTableJson(req.alterOriData, req.alterOriDataLen);
_err:
uDebug("alter stable return, sql json:%s", string);
uDebug("alter stable return, sql json:%s", cJSON_PrintUnformatted(pJson));
tDecoderClear(&coder);
return string;
return pJson;
}
static void buildChildElement(cJSON* json, SVCreateTbReq* pCreateReq) {
@ -387,7 +385,7 @@ end:
taosArrayDestroy(pTagVals);
}
static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
static cJSON* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
char* string = NULL;
cJSON* json = cJSON_CreateObject();
if (json == NULL) {
@ -412,16 +410,14 @@ static char* buildCreateCTableJson(SVCreateTbReq* pCreateReq, int32_t nReqs) {
cJSON_AddItemToArray(createList, create);
}
cJSON_AddItemToObject(json, "createList", createList);
string = cJSON_PrintUnformatted(json);
cJSON_Delete(json);
return string;
return json;
}
static char* processCreateTable(SMqMetaRsp* metaRsp) {
static cJSON* processCreateTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVCreateTbBatchReq req = {0};
SVCreateTbReq* pCreateReq;
char* string = NULL;
cJSON* pJson = NULL;
// decode
uDebug("create table data:%p", metaRsp);
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
@ -435,15 +431,15 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) {
if (req.nReqs > 0) {
pCreateReq = req.pReqs;
if (pCreateReq->type == TSDB_CHILD_TABLE) {
string = buildCreateCTableJson(req.pReqs, req.nReqs);
pJson = buildCreateCTableJson(req.pReqs, req.nReqs);
} else if (pCreateReq->type == TSDB_NORMAL_TABLE) {
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid,
pJson = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid,
TSDB_NORMAL_TABLE, &pCreateReq->colCmpr);
}
}
_exit:
uDebug("create table return, sql json:%s", string);
uDebug("create table return, sql json:%s", cJSON_PrintUnformatted(pJson));
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
pCreateReq = req.pReqs + iReq;
taosMemoryFreeClear(pCreateReq->comment);
@ -453,7 +449,7 @@ _exit:
}
}
tDecoderClear(&decoder);
return string;
return pJson;
}
static char* processAutoCreateTable(STaosxRsp* rsp) {
@ -482,7 +478,9 @@ static char* processAutoCreateTable(STaosxRsp* rsp) {
goto _exit;
}
}
string = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
cJSON* pJson = buildCreateCTableJson(pCreateReq, rsp->createTableNum);
string = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson);
_exit:
uDebug("auto created table return, sql json:%s", string);
for (int i = 0; i < rsp->createTableNum; i++) {
@ -497,7 +495,7 @@ _exit:
return string;
}
static char* processAlterTable(SMqMetaRsp* metaRsp) {
static cJSON* processAlterTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVAlterTbReq vAlterTbReq = {0};
char* string = NULL;
@ -622,19 +620,16 @@ static char* processAlterTable(SMqMetaRsp* metaRsp) {
default:
break;
}
string = cJSON_PrintUnformatted(json);
_exit:
uDebug("alter table return, sql json:%s", string);
cJSON_Delete(json);
uDebug("alter table return, sql json:%s", cJSON_PrintUnformatted(json));
tDecoderClear(&decoder);
return string;
return json;
}
static char* processDropSTable(SMqMetaRsp* metaRsp) {
static cJSON* processDropSTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVDropStbReq req = {0};
char* string = NULL;
cJSON* json = NULL;
uDebug("processDropSTable data:%p", metaRsp);
@ -659,18 +654,15 @@ static char* processDropSTable(SMqMetaRsp* metaRsp) {
cJSON* tableName = cJSON_CreateString(req.name);
cJSON_AddItemToObject(json, "tableName", tableName);
string = cJSON_PrintUnformatted(json);
_exit:
uDebug("processDropSTable return, sql json:%s", string);
cJSON_Delete(json);
uDebug("processDropSTable return, sql json:%s", cJSON_PrintUnformatted(json));
tDecoderClear(&decoder);
return string;
return json;
}
static char* processDeleteTable(SMqMetaRsp* metaRsp) {
static cJSON* processDeleteTable(SMqMetaRsp* metaRsp) {
SDeleteRes req = {0};
SDecoder coder = {0};
cJSON* json = NULL;
char* string = NULL;
uDebug("processDeleteTable data:%p", metaRsp);
// decode and process req
@ -698,18 +690,15 @@ static char* processDeleteTable(SMqMetaRsp* metaRsp) {
cJSON* sqlJson = cJSON_CreateString(sql);
cJSON_AddItemToObject(json, "sql", sqlJson);
string = cJSON_PrintUnformatted(json);
_exit:
uDebug("processDeleteTable return, sql json:%s", string);
cJSON_Delete(json);
uDebug("processDeleteTable return, sql json:%s", cJSON_PrintUnformatted(json));
tDecoderClear(&coder);
return string;
return json;
}
static char* processDropTable(SMqMetaRsp* metaRsp) {
static cJSON* processDropTable(SMqMetaRsp* metaRsp) {
SDecoder decoder = {0};
SVDropTbBatchReq req = {0};
char* string = NULL;
cJSON* json = NULL;
uDebug("processDropTable data:%p", metaRsp);
@ -743,12 +732,10 @@ static char* processDropTable(SMqMetaRsp* metaRsp) {
}
cJSON_AddItemToObject(json, "tableNameList", tableNameList);
string = cJSON_PrintUnformatted(json);
_exit:
uDebug("processDropTable return, json sql:%s", string);
cJSON_Delete(json);
uDebug("processDropTable return, json sql:%s", cJSON_PrintUnformatted(json));
tDecoderClear(&decoder);
return string;
return json;
}
static int32_t taosCreateStb(TAOS* taos, void* meta, int32_t metaLen) {
@ -1805,6 +1792,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, int32_t dataLen) {
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, NULL, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
taosMemoryFreeClear(pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end;
@ -1997,6 +1985,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, int32_t dataLen)
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindData(pQuery, pTableMeta, rawData, &pCreateReqDst, fields, pSW->nCols, true, err, ERR_MSG_LEN);
taosMemoryFree(fields);
taosMemoryFreeClear(pTableMeta);
if (code != TSDB_CODE_SUCCESS) {
SET_ERROR_MSG("table:%s, err:%s", tbName, err);
goto end;
@ -2027,7 +2016,7 @@ end:
return code;
}
static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) {
static cJSON* processSimpleMeta(SMqMetaRsp* pMetaRsp) {
if (pMetaRsp->resMsgType == TDMT_VND_CREATE_STB) {
return processCreateStb(pMetaRsp);
} else if (pMetaRsp->resMsgType == TDMT_VND_ALTER_STB) {
@ -2051,15 +2040,15 @@ static char* processSimpleMeta(SMqMetaRsp* pMetaRsp) {
static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
SDecoder coder;
SMqBatchMetaRsp rsp = {0};
SArray* strArray = NULL;
tDecoderInit(&coder, pMsgRsp->pMetaBuff, pMsgRsp->metaBuffLen);
if (tDecodeMqBatchMetaRsp(&coder, &rsp) < 0) {
goto _end;
}
int64_t fullSize = 0;
cJSON* pJson = cJSON_CreateObject();
cJSON_AddStringToObject(pJson, "tmq_meta_version", TMQ_META_VERSION);
cJSON* pMetaArr = cJSON_CreateArray();
int32_t num = taosArrayGetSize(rsp.batchMetaReq);
strArray = taosArrayInit(num, POINTER_BYTES);
for (int32_t i = 0; i < num; i++) {
int32_t len = *(int32_t*)taosArrayGet(rsp.batchMetaLen, i);
void* tmpBuf = taosArrayGetP(rsp.batchMetaReq, i);
@ -2069,26 +2058,19 @@ static char* processBatchMetaToJson(SMqBatchMetaRsp* pMsgRsp) {
if(tDecodeMqMetaRsp(&metaCoder, &metaRsp) < 0 ) {
goto _end;
}
char* subStr = processSimpleMeta(&metaRsp);
cJSON* pItem = processSimpleMeta(&metaRsp);
tDeleteMqMetaRsp(&metaRsp);
fullSize += strlen(subStr);
taosArrayPush(strArray, &subStr);
cJSON_AddItemToArray(pMetaArr, pItem);
}
char* buf = (char*)taosMemoryCalloc(1, fullSize + num + 1);
for (int32_t i = 0; i < num; i++) {
char* subStr = taosArrayGetP(strArray, i);
strcat(buf, subStr);
if (i < num - 1) {
strcat(buf, "\n");
}
}
taosArrayDestroyP(strArray, taosMemoryFree);
cJSON_AddItemToObject(pJson, "metas", pMetaArr);
tDeleteMqBatchMetaRsp(&rsp);
return buf;
char* fullStr = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson);
return fullStr;
_end:
taosArrayDestroyP(strArray, taosMemoryFree);
cJSON_Delete(pJson);
tDeleteMqBatchMetaRsp(&rsp);
return NULL;
}
@ -2109,7 +2091,10 @@ char* tmq_get_json_meta(TAOS_RES* res) {
}
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
return processSimpleMeta(&pMetaRspObj->metaRsp);
cJSON* pJson = processSimpleMeta(&pMetaRspObj->metaRsp);
char* string = cJSON_PrintUnformatted(pJson);
cJSON_Delete(pJson);
return string;
}
void tmq_free_json_meta(char* jsonMeta) { taosMemoryFreeClear(jsonMeta); }

View File

@ -18,6 +18,7 @@
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "cJSON.h"
#include "taos.h"
#include "tmsg.h"
#include "types.h"
@ -61,10 +62,25 @@ static void msg_process(TAOS_RES* msg) {
if (result) {
printf("meta result: %s\n", result);
if (g_fp && strcmp(result, "") != 0) {
// RES_TYPE__TMQ_BATCH_META
if ((*(int8_t*)msg) == 5) {
cJSON* pJson = cJSON_Parse(result);
cJSON* pJsonArray = cJSON_GetObjectItem(pJson, "metas");
int32_t num = cJSON_GetArraySize(pJsonArray);
for (int32_t i = 0; i < num; i++) {
cJSON* pJsonItem = cJSON_GetArrayItem(pJsonArray, i);
char* itemStr = cJSON_PrintUnformatted(pJsonItem);
taosFprintfFile(g_fp, itemStr);
tmq_free_json_meta(itemStr);
taosFprintfFile(g_fp, "\n");
}
cJSON_Delete(pJson);
} else {
taosFprintfFile(g_fp, result);
taosFprintfFile(g_fp, "\n");
}
}
}
tmq_free_json_meta(result);
}