diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 697a53e570..56eb9030fc 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -18,6 +18,7 @@ #include #include #include "taos.h" +#include static int running = 1; static void msg_process(TAOS_RES* msg) { @@ -30,7 +31,11 @@ static void msg_process(TAOS_RES* msg) { void* meta; int32_t metaLen; tmq_get_raw_meta(msg, &meta, &metaLen); - + char* result = tmq_get_json_meta(msg); + if(result){ + printf("meta result: %s\n", result); + free(result); + } printf("meta, len is %d\n", metaLen); return; } @@ -137,8 +142,8 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ - pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); +// pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); return -1; @@ -199,7 +204,7 @@ tmq_t* build_consumer() { tmq_conf_set(conf, "msg.with.table.name", "true"); tmq_conf_set(conf, "enable.auto.commit", "true"); - tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + tmq_conf_set(conf, "experimental.snapshot.enable", "false"); tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); diff --git a/include/client/taos.h b/include/client/taos.h index 79f567fc9a..216a5832b0 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -263,6 +263,8 @@ typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, void **raw_meta, int32_t *raw_meta_len); +DLL_EXPORT int32_t taos_write_raw_meta(TAOS *res, void *raw_meta, int32_t raw_meta_len); +DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed. DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); DLL_EXPORT const char *tmq_get_db_name(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_vgroup_id(TAOS_RES *res); diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 667f5b1dbc..71168a0fa6 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -23,6 +23,7 @@ #include "tqueue.h" #include "tref.h" #include "ttimer.h" +#include "cJSON.h" int32_t tmqAskEp(tmq_t* tmq, bool async); @@ -1848,6 +1849,343 @@ int32_t tmq_get_raw_meta(TAOS_RES* res, void** raw_meta, int32_t* raw_meta_len) return -1; } +static char *buildCreateTableJson(SSchemaWrapper *schemaRow, SSchemaWrapper* schemaTag, char* name, int64_t id, int8_t t){ + char* string = NULL; + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + return string; + } + cJSON* type = cJSON_CreateString("create"); + cJSON_AddItemToObject(json, "type", type); + cJSON* uid = cJSON_CreateNumber(id); + cJSON_AddItemToObject(json, "uid", uid); + cJSON* tableName = cJSON_CreateString(name); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString(t == TSDB_NORMAL_TABLE ? "normal" : "super"); + cJSON_AddItemToObject(json, "tableType", tableType); +// cJSON* version = cJSON_CreateNumber(1); +// cJSON_AddItemToObject(json, "version", version); + + cJSON* columns = cJSON_CreateArray(); + for(int i = 0; i < schemaRow->nCols; i++){ + cJSON* column = cJSON_CreateObject(); + SSchema *s = schemaRow->pSchema + i; + cJSON* cname = cJSON_CreateString(s->name); + cJSON_AddItemToObject(column, "name", cname); + cJSON* ctype = cJSON_CreateNumber(s->type); + cJSON_AddItemToObject(column, "type", ctype); + cJSON* cbytes = cJSON_CreateNumber(s->bytes); + cJSON_AddItemToObject(column, "bytes", cbytes); + cJSON_AddItemToArray(columns, column); + } + cJSON_AddItemToObject(json, "columns", columns); + + cJSON* tags = cJSON_CreateArray(); + for(int i = 0; schemaTag && i < schemaTag->nCols; i++){ + cJSON* tag = cJSON_CreateObject(); + SSchema *s = schemaTag->pSchema + i; + cJSON* tname = cJSON_CreateString(s->name); + cJSON_AddItemToObject(tag, "name", tname); + cJSON* ttype = cJSON_CreateNumber(s->type); + cJSON_AddItemToObject(tag, "type", ttype); + cJSON* tbytes = cJSON_CreateNumber(s->bytes); + cJSON_AddItemToObject(tag, "bytes", tbytes); + cJSON_AddItemToArray(tags, tag); + } + cJSON_AddItemToObject(json, "tags", tags); + + string = cJSON_PrintUnformatted(json); + cJSON_Delete(json); + return string; +} + +static char *processCreateStb(SMqMetaRsp *metaRsp){ + SVCreateStbReq req = {0}; + SDecoder coder; + char* string = NULL; + + // decode and process req + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&coder, data, len); + + if (tDecodeSVCreateStbReq(&coder, &req) < 0) { + goto _err; + } + string = buildCreateTableJson(&req.schemaRow, &req.schemaTag, req.name, req.suid, TSDB_SUPER_TABLE); + tDecoderClear(&coder); + return string; + +_err: + tDecoderClear(&coder); + return string; +} + +static char *buildCreateCTableJson(STag* pTag, int64_t sid, char* name, int64_t id){ + char* string = NULL; + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + return string; + } + cJSON* type = cJSON_CreateString("create"); + cJSON_AddItemToObject(json, "type", type); + cJSON* uid = cJSON_CreateNumber(id); + cJSON_AddItemToObject(json, "uid", uid); + cJSON* tableName = cJSON_CreateString(name); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString("child"); + cJSON_AddItemToObject(json, "tableType", tableType); + cJSON* using = cJSON_CreateNumber(sid); + cJSON_AddItemToObject(json, "using", using); +// cJSON* version = cJSON_CreateNumber(1); +// cJSON_AddItemToObject(json, "version", version); + + cJSON* tags = cJSON_CreateArray(); + + if (tTagIsJson(pTag)) { // todo + char* pJson = parseTagDatatoJson(pTag); + + cJSON* tag = cJSON_CreateObject(); + cJSON* tname = cJSON_CreateNumber(1); // todo + cJSON_AddItemToObject(tag, "cid", tname); + cJSON* ttype = cJSON_CreateNumber(TSDB_DATA_TYPE_JSON); + cJSON_AddItemToObject(tag, "type", ttype); + cJSON* tvalue = cJSON_CreateString(pJson); // todo + cJSON_AddItemToObject(tag, "value", tvalue); + cJSON_AddItemToArray(tags, tag); + cJSON_AddItemToObject(json, "tags", tags); + + string = cJSON_PrintUnformatted(json); + goto end; + } + + SArray* pTagVals = NULL; + int32_t code = tTagToValArray(pTag, &pTagVals); + if (code) { + goto end; + } + + for(int i = 0; taosArrayGetSize(pTagVals); i++){ + STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, i); + + cJSON* tag = cJSON_CreateObject(); + cJSON* tname = cJSON_CreateNumber(pTagVal->cid); + cJSON_AddItemToObject(tag, "cid", tname); + cJSON* ttype = cJSON_CreateNumber(pTagVal->type); + cJSON_AddItemToObject(tag, "type", ttype); + cJSON* tvalue = cJSON_CreateString("todo"); // todo + cJSON_AddItemToObject(tag, "value", tvalue); + cJSON_AddItemToArray(tags, tag); + } + cJSON_AddItemToObject(json, "tags", tags); + string = cJSON_PrintUnformatted(json); + +end: + + cJSON_Delete(json); + return string; +} + +static char *processCreateTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVCreateTbBatchReq req = {0}; + SVCreateTbReq *pCreateReq; + char *string = NULL; + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVCreateTbBatchReq(&decoder, &req) < 0) { + goto _exit; + } + + // loop to create table + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + pCreateReq = req.pReqs + iReq; + if(pCreateReq->type == TSDB_CHILD_TABLE){ + string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.suid, pCreateReq->name, pCreateReq->uid); + }else if(pCreateReq->type == TSDB_NORMAL_TABLE){ + string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); + } + } + + tDecoderClear(&decoder); + + _exit: + tDecoderClear(&decoder); + return string; +} + +static char *processAlterTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVAlterTbReq vAlterTbReq = {0}; + char *string = NULL; + + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVAlterTbReq(&decoder, &vAlterTbReq) < 0) { + goto _exit; + } + + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + goto _exit; + } + cJSON* type = cJSON_CreateString("alter"); + cJSON_AddItemToObject(json, "type", type); +// cJSON* uid = cJSON_CreateNumber(id); +// cJSON_AddItemToObject(json, "uid", uid); + cJSON* tableName = cJSON_CreateString(vAlterTbReq.tbName); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString("normal"); + cJSON_AddItemToObject(json, "tableType", tableType); + + switch (vAlterTbReq.action) { + case TSDB_ALTER_TABLE_ADD_COLUMN: { + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_ADD_COLUMN); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); + cJSON_AddItemToObject(json, "colType", colType); + cJSON* colBytes = cJSON_CreateNumber(vAlterTbReq.bytes); + cJSON_AddItemToObject(json, "colBytes", colBytes); + break; + } + case TSDB_ALTER_TABLE_DROP_COLUMN:{ + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_DROP_COLUMN); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + break; + } + case TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES:{ + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_BYTES); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colType = cJSON_CreateNumber(vAlterTbReq.type); + cJSON_AddItemToObject(json, "colType", colType); + cJSON* colBytes = cJSON_CreateNumber(vAlterTbReq.colModBytes); + cJSON_AddItemToObject(json, "colBytes", colBytes); + break; + } + case TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME:{ + cJSON* alterType = cJSON_CreateNumber(TSDB_ALTER_TABLE_UPDATE_COLUMN_NAME); + cJSON_AddItemToObject(json, "alterType", alterType); + cJSON* colName = cJSON_CreateString(vAlterTbReq.colName); + cJSON_AddItemToObject(json, "colName", colName); + cJSON* colNewName = cJSON_CreateString(vAlterTbReq.colNewName); + cJSON_AddItemToObject(json, "colNewName", colNewName); + break; + } + default: + break; + } + string = cJSON_PrintUnformatted(json); + + _exit: + tDecoderClear(&decoder); + return string; +} + +static char *processDropSTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVDropStbReq req = {0}; + char *string = NULL; + + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVDropStbReq(&decoder, &req) < 0) { + goto _exit; + } + + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + goto _exit; + } + cJSON* type = cJSON_CreateString("drop"); + cJSON_AddItemToObject(json, "type", type); + cJSON* uid = cJSON_CreateNumber(req.suid); + cJSON_AddItemToObject(json, "uid", uid); + cJSON* tableName = cJSON_CreateString(req.name); + cJSON_AddItemToObject(json, "tableName", tableName); + cJSON* tableType = cJSON_CreateString("super"); + cJSON_AddItemToObject(json, "tableType", tableType); + + string = cJSON_PrintUnformatted(json); + + _exit: + tDecoderClear(&decoder); + return string; +} + +static char *processDropTable(SMqMetaRsp *metaRsp){ + SDecoder decoder = {0}; + SVDropTbBatchReq req = {0}; + char *string = NULL; + + // decode + void* data = POINTER_SHIFT(metaRsp->metaRsp, sizeof(SMsgHead)); + int32_t len = metaRsp->metaRspLen - sizeof(SMsgHead); + tDecoderInit(&decoder, data, len); + if (tDecodeSVDropTbBatchReq(&decoder, &req) < 0) { + goto _exit; + } + + cJSON* json = cJSON_CreateObject(); + if (json == NULL) { + goto _exit; + } + cJSON* type = cJSON_CreateString("drop"); + cJSON_AddItemToObject(json, "type", type); +// cJSON* uid = cJSON_CreateNumber(id); +// cJSON_AddItemToObject(json, "uid", uid); + cJSON* tableType = cJSON_CreateString("normal"); + cJSON_AddItemToObject(json, "tableType", tableType); + + cJSON* tableNameList = cJSON_CreateArray(); + for (int32_t iReq = 0; iReq < req.nReqs; iReq++) { + SVDropTbReq* pDropTbReq = req.pReqs + iReq; + + cJSON* tableName = cJSON_CreateString(pDropTbReq->name); // todo + cJSON_AddItemToArray(tableNameList, tableName); + } + cJSON_AddItemToObject(json, "tableNameList", tableNameList); + + string = cJSON_PrintUnformatted(json); + + _exit: + tDecoderClear(&decoder); + return string; +} + +char *tmq_get_json_meta(TAOS_RES *res){ + if (!TD_RES_TMQ_META(res)) { + return NULL; + } + + SMqMetaRspObj* pMetaRspObj = (SMqMetaRspObj*)res; + if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_STB){ + return processCreateStb(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_STB){ + + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_STB){ + return processDropSTable(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_CREATE_TABLE){ + return processCreateTable(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_ALTER_TABLE){ + return processAlterTable(&pMetaRspObj->metaRsp); + }else if(pMetaRspObj->metaRsp.resMsgType == TDMT_VND_DROP_TABLE){ + return processDropTable(&pMetaRspObj->metaRsp); + } + return NULL; +} + void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { tmqCommitInner2(tmq, msg, 0, 1, cb, param); }