feat: add get meta interface for tmq
This commit is contained in:
parent
a558672622
commit
a74f19671d
|
@ -18,6 +18,7 @@
|
|||
#include <string.h>
|
||||
#include <time.h>
|
||||
#include "taos.h"
|
||||
#include <stdlib.h>
|
||||
|
||||
static int running = 1;
|
||||
static void msg_process(TAOS_RES* msg) {
|
||||
|
@ -30,7 +31,11 @@ static void msg_process(TAOS_RES* msg) {
|
|||
void* meta;
|
||||
int32_t metaLen;
|
||||
tmq_get_raw_meta(msg, &meta, &metaLen);
|
||||
|
||||
char* result = tmq_get_json_meta(msg);
|
||||
if(result){
|
||||
printf("meta result: %s\n", result);
|
||||
free(result);
|
||||
}
|
||||
printf("meta, len is %d\n", metaLen);
|
||||
return;
|
||||
}
|
||||
|
@ -137,8 +142,8 @@ int32_t create_topic() {
|
|||
}
|
||||
taos_free_result(pRes);
|
||||
|
||||
/*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");
|
||||
// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1");
|
||||
if (taos_errno(pRes) != 0) {
|
||||
printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes));
|
||||
return -1;
|
||||
|
@ -199,7 +204,7 @@ tmq_t* build_consumer() {
|
|||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
tmq_conf_set(conf, "enable.auto.commit", "true");
|
||||
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
|
||||
tmq_conf_set(conf, "experimental.snapshot.enable", "false");
|
||||
|
||||
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
|
|
|
@ -263,6 +263,8 @@ typedef enum tmq_res_t tmq_res_t;
|
|||
|
||||
DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len);
|
||||
DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len);
|
||||
DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed.
|
||||
DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res);
|
||||
DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res);
|
||||
DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res);
|
||||
|
|
|
@ -23,6 +23,7 @@
|
|||
#include "tqueue.h"
|
||||
#include "tref.h"
|
||||
#include "ttimer.h"
|
||||
#include "cJSON.h"
|
||||
|
||||
int32_t tmqAskEp(tmq_t* tmq, bool async);
|
||||
|
||||
|
@ -1848,6 +1849,343 @@ int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len)
|
|||
return -1;
|
||||
}
|
||||
|
||||
static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
cJSON* uid = cJSON_CreateNumber(id);
|
||||
cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
// cJSON* version = cJSON_CreateNumber(1);
|
||||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
cJSON* columns = cJSON_CreateArray();
|
||||
for(int i = 0; i < schemaRow->nCols; i++){
|
||||
cJSON* column = cJSON_CreateObject();
|
||||
SSchema *s = schemaRow->pSchema + i;
|
||||
cJSON* cname = cJSON_CreateString(s->name);
|
||||
cJSON_AddItemToObject(column, "name", cname);
|
||||
cJSON* ctype = cJSON_CreateNumber(s->type);
|
||||
cJSON_AddItemToObject(column, "type", ctype);
|
||||
cJSON* cbytes = cJSON_CreateNumber(s->bytes);
|
||||
cJSON_AddItemToObject(column, "bytes", cbytes);
|
||||
cJSON_AddItemToArray(columns, column);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "columns", columns);
|
||||
|
||||
cJSON* tags = cJSON_CreateArray();
|
||||
for(int i = 0; schemaTag && i < schemaTag->nCols; i++){
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
SSchema *s = schemaTag->pSchema + i;
|
||||
cJSON* tname = cJSON_CreateString(s->name);
|
||||
cJSON_AddItemToObject(tag, "name", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(s->type);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
cJSON* tbytes = cJSON_CreateNumber(s->bytes);
|
||||
cJSON_AddItemToObject(tag, "bytes", tbytes);
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
cJSON_Delete(json);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processCreateStb(SMqMetaRsp *metaRsp){
|
||||
SVCreateStbReq req = {0};
|
||||
SDecoder coder;
|
||||
char* string = NULL;
|
||||
|
||||
// decode and process req
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&coder, data, len);
|
||||
|
||||
if (tDecodeSVCreateStbReq(&coder, &req) < 0) {
|
||||
goto _err;
|
||||
}
|
||||
string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE);
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
|
||||
_err:
|
||||
tDecoderClear(&coder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){
|
||||
char* string = NULL;
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
return string;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("create");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
cJSON* uid = cJSON_CreateNumber(id);
|
||||
cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableName = cJSON_CreateString(name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("child");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
cJSON* using = cJSON_CreateNumber(sid);
|
||||
cJSON_AddItemToObject(json, "using", using);
|
||||
// cJSON* version = cJSON_CreateNumber(1);
|
||||
// cJSON_AddItemToObject(json, "version", version);
|
||||
|
||||
cJSON* tags = cJSON_CreateArray();
|
||||
|
||||
if (tTagIsJson(pTag)) { // todo
|
||||
char* pJson = parseTagDatatoJson(pTag);
|
||||
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
cJSON* tname = cJSON_CreateNumber(1); // todo
|
||||
cJSON_AddItemToObject(tag, "cid", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
cJSON* tvalue = cJSON_CreateString(pJson); // todo
|
||||
cJSON_AddItemToObject(tag, "value", tvalue);
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
goto end;
|
||||
}
|
||||
|
||||
SArray* pTagVals = NULL;
|
||||
int32_t code = tTagToValArray(pTag, &pTagVals);
|
||||
if (code) {
|
||||
goto end;
|
||||
}
|
||||
|
||||
for(int i = 0; taosArrayGetSize(pTagVals); i++){
|
||||
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i);
|
||||
|
||||
cJSON* tag = cJSON_CreateObject();
|
||||
cJSON* tname = cJSON_CreateNumber(pTagVal->cid);
|
||||
cJSON_AddItemToObject(tag, "cid", tname);
|
||||
cJSON* ttype = cJSON_CreateNumber(pTagVal->type);
|
||||
cJSON_AddItemToObject(tag, "type", ttype);
|
||||
cJSON* tvalue = cJSON_CreateString("todo"); // todo
|
||||
cJSON_AddItemToObject(tag, "value", tvalue);
|
||||
cJSON_AddItemToArray(tags, tag);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tags", tags);
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
end:
|
||||
|
||||
cJSON_Delete(json);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processCreateTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVCreateTbBatchReq req = {0};
|
||||
SVCreateTbReq *pCreateReq;
|
||||
char *string = NULL;
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
// loop to create table
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
pCreateReq = req.pReqs + iReq;
|
||||
if(pCreateReq->type == TSDB_CHILD_TABLE){
|
||||
string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid);
|
||||
}else if(pCreateReq->type == TSDB_NORMAL_TABLE){
|
||||
string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE);
|
||||
}
|
||||
}
|
||||
|
||||
tDecoderClear(&decoder);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processAlterTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVAlterTbReq vAlterTbReq = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("alter");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// cJSON* uid = cJSON_CreateNumber(id);
|
||||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("normal");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
switch (vAlterTbReq.action) {
|
||||
case TSDB_ALTER_TABLE_ADD_COLUMN: {
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_ADD_COLUMN);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
cJSON* colBytes = cJSON_CreateNumber(vAlterTbReq.bytes);
|
||||
cJSON_AddItemToObject(json, "colBytes", colBytes);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_DROP_COLUMN:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type);
|
||||
cJSON_AddItemToObject(json, "colType", colType);
|
||||
cJSON* colBytes = cJSON_CreateNumber(vAlterTbReq.colModBytes);
|
||||
cJSON_AddItemToObject(json, "colBytes", colBytes);
|
||||
break;
|
||||
}
|
||||
case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{
|
||||
cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME);
|
||||
cJSON_AddItemToObject(json, "alterType", alterType);
|
||||
cJSON* colName = cJSON_CreateString(vAlterTbReq.colName);
|
||||
cJSON_AddItemToObject(json, "colName", colName);
|
||||
cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName);
|
||||
cJSON_AddItemToObject(json, "colNewName", colNewName);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
break;
|
||||
}
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processDropSTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVDropStbReq req = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVDropStbReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("drop");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
cJSON* uid = cJSON_CreateNumber(req.suid);
|
||||
cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableName = cJSON_CreateString(req.name);
|
||||
cJSON_AddItemToObject(json, "tableName", tableName);
|
||||
cJSON* tableType = cJSON_CreateString("super");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
static char *processDropTable(SMqMetaRsp *metaRsp){
|
||||
SDecoder decoder = {0};
|
||||
SVDropTbBatchReq req = {0};
|
||||
char *string = NULL;
|
||||
|
||||
// decode
|
||||
void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead));
|
||||
int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead);
|
||||
tDecoderInit(&decoder, data, len);
|
||||
if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) {
|
||||
goto _exit;
|
||||
}
|
||||
|
||||
cJSON* json = cJSON_CreateObject();
|
||||
if (json == NULL) {
|
||||
goto _exit;
|
||||
}
|
||||
cJSON* type = cJSON_CreateString("drop");
|
||||
cJSON_AddItemToObject(json, "type", type);
|
||||
// cJSON* uid = cJSON_CreateNumber(id);
|
||||
// cJSON_AddItemToObject(json, "uid", uid);
|
||||
cJSON* tableType = cJSON_CreateString("normal");
|
||||
cJSON_AddItemToObject(json, "tableType", tableType);
|
||||
|
||||
cJSON* tableNameList = cJSON_CreateArray();
|
||||
for (int32_t iReq = 0; iReq < req.nReqs; iReq++) {
|
||||
SVDropTbReq* pDropTbReq = req.pReqs + iReq;
|
||||
|
||||
cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo
|
||||
cJSON_AddItemToArray(tableNameList, tableName);
|
||||
}
|
||||
cJSON_AddItemToObject(json, "tableNameList", tableNameList);
|
||||
|
||||
string = cJSON_PrintUnformatted(json);
|
||||
|
||||
_exit:
|
||||
tDecoderClear(&decoder);
|
||||
return string;
|
||||
}
|
||||
|
||||
char *tmq_get_json_meta(TAOS_RES *res){
|
||||
if (!TD_RES_TMQ_META(res)) {
|
||||
return NULL;
|
||||
}
|
||||
|
||||
SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res;
|
||||
if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){
|
||||
return processCreateStb(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){
|
||||
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){
|
||||
return processDropSTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){
|
||||
return processCreateTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){
|
||||
return processAlterTable(&pMetaRspObj->metaRsp);
|
||||
}else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){
|
||||
return processDropTable(&pMetaRspObj->metaRsp);
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) {
|
||||
tmqCommitInner2(tmq, msg, 0, 1, cb, param);
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue