diff --git a/examples/c/CMakeLists.txt b/examples/c/CMakeLists.txt index 4a9007acec..9d06dbac6d 100644 --- a/examples/c/CMakeLists.txt +++ b/examples/c/CMakeLists.txt @@ -13,9 +13,15 @@ IF (TD_LINUX) #TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua) add_executable(tmq "") + add_executable(tmq_taosx "") add_executable(stream_demo "") add_executable(demoapi "") + target_sources(tmq_taosx + PRIVATE + "tmq_taosx.c" + ) + target_sources(tmq PRIVATE "tmq.c" @@ -35,6 +41,10 @@ IF (TD_LINUX) taos_static ) + target_link_libraries(tmq_taosx + taos_static + ) + target_link_libraries(stream_demo taos_static ) @@ -47,6 +57,10 @@ IF (TD_LINUX) PUBLIC "${TD_SOURCE_DIR}/include/os" PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" ) + target_include_directories(tmq_taosx + PUBLIC "${TD_SOURCE_DIR}/include/os" + PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" + ) target_include_directories(stream_demo PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc" @@ -59,6 +73,7 @@ IF (TD_LINUX) ) SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq) + SET_TARGET_PROPERTIES(tmq_taosx PROPERTIES OUTPUT_NAME tmq_taosx) SET_TARGET_PROPERTIES(stream_demo PROPERTIES OUTPUT_NAME stream_demo) SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi) ENDIF () diff --git a/examples/c/tmq.c b/examples/c/tmq.c index 7e9a24ff1f..1d2b26624b 100644 --- a/examples/c/tmq.c +++ b/examples/c/tmq.c @@ -302,7 +302,7 @@ int32_t create_topic() { } taos_free_result(pRes); - /*pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1");*/ +// pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); pRes = taos_query(pConn, "create topic topic_ctb_column as select ts, c1, c2, c3 from st1"); if (taos_errno(pRes) != 0) { printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); diff --git a/examples/c/tmq_taosx.c b/examples/c/tmq_taosx.c new file mode 100644 index 0000000000..13f3b18e64 --- /dev/null +++ b/examples/c/tmq_taosx.c @@ -0,0 +1,459 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "taos.h" + +static int running = 1; + +static TAOS* use_db(){ + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return NULL; + } + + TAOS_RES* pRes = taos_query(pConn, "use db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in use db_taosx, reason:%s\n", taos_errstr(pRes)); + return NULL; + } + taos_free_result(pRes); + return pConn; +} + +static void msg_process(TAOS_RES* msg) { + /*memset(buf, 0, 1024);*/ + printf("-----------topic-------------: %s\n", tmq_get_topic_name(msg)); + printf("db: %s\n", tmq_get_db_name(msg)); + printf("vg: %d\n", tmq_get_vgroup_id(msg)); + TAOS *pConn = use_db(); + if (tmq_get_res_type(msg) == TMQ_RES_TABLE_META) { + char* result = tmq_get_json_meta(msg); + if (result) { + printf("meta result: %s\n", result); + } + tmq_free_json_meta(result); + + + tmq_raw_data raw = {0}; + tmq_get_raw_meta(msg, &raw); + int32_t ret = taos_write_raw_meta(pConn, raw); + printf("write raw meta: %s\n", tmq_err2str(ret)); + } + + if(tmq_get_res_type(msg) == TMQ_RES_DATA){ + int32_t ret =taos_write_raw_data(pConn, msg); + printf("write raw data: %s\n", tmq_err2str(ret)); + } + taos_close(pConn); +} + +int32_t init_env() { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + TAOS_RES* pRes = taos_query(pConn, "drop database if exists db_taosx"); + if (taos_errno(pRes) != 0) { + printf("error in drop db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists db_taosx vgroups 4"); + if (taos_errno(pRes) != 0) { + printf("error in create db_taosx, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "drop database if exists abc1"); + if (taos_errno(pRes) != 0) { + printf("error in drop db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create database if not exists abc1 vgroups 3"); + if (taos_errno(pRes) != 0) { + printf("error in create db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, + "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " + "nchar(8), t4 bool)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct0 using st1 tags(1000, \"ttt\", true)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table tu1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct0 values(now, 1, 2, 'a')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct0, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct1 using st1(t1) tags(2000)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct2 using st1(t1) tags(NULL)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table ct2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct1 values(now, 3, 4, 'b')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists ct3 using st1(t1) tags(3000)"); + if (taos_errno(pRes) != 0) { + printf("failed to create child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct3 values(now, 5, 6, 'c') ct1 values(now+1s, 2, 3, 'sds') (now+2s, 4, 5, 'ddd') ct0 values(now+1s, 4, 3, 'hwj') ct1 values(now+5s, 23, 32, 's21ds')"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 add column c4 bigint"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 modify column c3 binary(64)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into ct3 values(now+7s, 53, 63, 'cffffffffffffffffffffffffffff', 8989898899999) (now+9s, 51, 62, 'c333', 940)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table st1 add tag t2 binary(64)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter super table st1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table ct3 set tag t1=5000"); + if (taos_errno(pRes) != 0) { + printf("failed to slter child table ct3, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + +// pRes = taos_query(pConn, "drop table ct3 ct1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop child table ct3, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "drop table st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); + + pRes = taos_query(pConn, "create table if not exists n1(ts timestamp, c1 int, c2 nchar(4))"); + if (taos_errno(pRes) != 0) { + printf("failed to create normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 add column c3 bigint"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 modify column c2 nchar(8)"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 rename column c3 cc3"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 comment 'hello'"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "alter table n1 drop column c1"); + if (taos_errno(pRes) != 0) { + printf("failed to alter normal table n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "insert into n1 values(now, 'eeee', 8989898899999) (now+9s, 'c333', 940)"); + if (taos_errno(pRes) != 0) { + printf("failed to insert into n1, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + +// pRes = taos_query(pConn, "drop table n1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop normal table n1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt(ts timestamp, i int) tags(t json)"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt1 using jt tags('{\"k1\":1, \"k2\":\"hello\"}')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create table jt2 using jt tags('')"); + if (taos_errno(pRes) != 0) { + printf("failed to create super table jt2, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + +// pRes = taos_query(pConn, +// "create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 " +// "nchar(8), t4 bool)"); +// if (taos_errno(pRes) != 0) { +// printf("failed to create super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); +// +// pRes = taos_query(pConn, "drop table st1"); +// if (taos_errno(pRes) != 0) { +// printf("failed to drop super table st1, reason:%s\n", taos_errstr(pRes)); +// return -1; +// } +// taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +int32_t create_topic() { + printf("create topic\n"); + TAOS_RES* pRes; + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + if (pConn == NULL) { + return -1; + } + + pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + pRes = taos_query(pConn, "create topic topic_ctb_column with meta as database abc1"); + if (taos_errno(pRes) != 0) { + printf("failed to create topic topic_ctb_column, reason:%s\n", taos_errstr(pRes)); + return -1; + } + taos_free_result(pRes); + + taos_close(pConn); + return 0; +} + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { +#if 0 + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); +#endif + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + + /*tmq_conf_set(conf, "experimental.snapshot.enable", "true");*/ + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "topic_ctb_column"); + /*tmq_list_append(topic_list, "tmq_test_db_multi_insert_topic");*/ + return topic_list; +} + +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + while (running) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, -1); + if (tmqmessage) { + cnt++; + msg_process(tmqmessage); + /*if (cnt >= 2) break;*/ + /*printf("get data\n");*/ + taos_free_result(tmqmessage); + /*} else {*/ + /*break;*/ + /*tmq_commit_sync(tmq, NULL);*/ + } + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +void sync_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + static const int MIN_COMMIT_COUNT = 1; + + int msg_count = 0; + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + return; + } + + tmq_list_t* subList = NULL; + tmq_subscription(tmq, &subList); + char** subTopics = tmq_list_to_c_array(subList); + int32_t sz = tmq_list_get_size(subList); + printf("subscribed topics: "); + for (int32_t i = 0; i < sz; i++) { + printf("%s, ", subTopics[i]); + } + printf("\n"); + tmq_list_destroy(subList); + + while (running) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + if (tmqmessage) { + msg_process(tmqmessage); + taos_free_result(tmqmessage); + + /*tmq_commit_sync(tmq, NULL);*/ + /*if ((++msg_count % MIN_COMMIT_COUNT) == 0) tmq_commit(tmq, NULL, 0);*/ + } + } + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main(int argc, char* argv[]) { + printf("env init\n"); + if (init_env() < 0) { + return -1; + } + create_topic(); + + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + basic_consume_loop(tmq, topic_list); + /*sync_consume_loop(tmq, topic_list);*/ +} diff --git a/include/client/taos.h b/include/client/taos.h index f8af010aa6..5f147bb07c 100644 --- a/include/client/taos.h +++ b/include/client/taos.h @@ -270,6 +270,7 @@ typedef enum tmq_res_t tmq_res_t; DLL_EXPORT tmq_res_t tmq_get_res_type(TAOS_RES *res); DLL_EXPORT int32_t tmq_get_raw_meta(TAOS_RES *res, tmq_raw_data *raw_meta); DLL_EXPORT int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta); +DLL_EXPORT int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *res); DLL_EXPORT char *tmq_get_json_meta(TAOS_RES *res); // Returning null means error. Returned result need to be freed by tmq_free_json_meta DLL_EXPORT void tmq_free_json_meta(char* jsonMeta); DLL_EXPORT const char *tmq_get_topic_name(TAOS_RES *res); diff --git a/include/common/tmsg.h b/include/common/tmsg.h index 45be3a8507..ee19969e50 100644 --- a/include/common/tmsg.h +++ b/include/common/tmsg.h @@ -1977,6 +1977,7 @@ typedef struct SVCreateTbReq { union { struct { char* name; // super table name + uint8_t tagNum; tb_uid_t suid; SArray* tagName; uint8_t* pTag; diff --git a/source/client/src/clientEnv.c b/source/client/src/clientEnv.c index 5b96729503..893aa39ce3 100644 --- a/source/client/src/clientEnv.c +++ b/source/client/src/clientEnv.c @@ -244,6 +244,7 @@ void *createRequest(uint64_t connId, int32_t type) { STscObj *pTscObj = acquireTscObj(connId); if (pTscObj == NULL) { + taosMemoryFree(pRequest); terrno = TSDB_CODE_TSC_DISCONNECTED; return NULL; } diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 7093e14982..dbe1d9d4a4 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -834,7 +834,7 @@ void schedulerExecCb(SExecResult* pResult, void* param, int32_t code) { tstrerror(code), pRequest->requestId); STscObj* pTscObj = pRequest->pTscObj; - if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code)) { + if (code != TSDB_CODE_SUCCESS && NEED_CLIENT_HANDLE_ERROR(code) && pRequest->sqlstr != NULL) { tscDebug("0x%" PRIx64 " client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%" PRIx64, pRequest->self, code, tstrerror(code), pRequest->retry, pRequest->requestId); pRequest->prevCode = code; diff --git a/source/client/src/clientSml.c b/source/client/src/clientSml.c index ca45202547..bb42e47642 100644 --- a/source/client/src/clientSml.c +++ b/source/client/src/clientSml.c @@ -1489,7 +1489,7 @@ static SSmlHandle* smlBuildSmlInfo(STscObj* pTscObj, SRequestObj* request, SMLPr } info->id = smlGenId(); - info->pQuery = (SQuery *)taosMemoryCalloc(1, sizeof(SQuery)); + info->pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); if (NULL == info->pQuery) { uError("SML:0x%" PRIx64 " create info->pQuery error", info->id); goto cleanup; diff --git a/source/client/src/tmq.c b/source/client/src/tmq.c index 4c29adcdc0..74d5d4270b 100644 --- a/source/client/src/tmq.c +++ b/source/client/src/tmq.c @@ -2128,7 +2128,7 @@ _err: return string; } -static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id) { +static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* tagName, int64_t id, uint8_t tagNum) { char* string = NULL; SArray* pTagVals = NULL; cJSON* json = cJSON_CreateObject(); @@ -2148,6 +2148,8 @@ static char* buildCreateCTableJson(STag* pTag, char* sname, char* name, SArray* cJSON_AddItemToObject(json, "tableType", tableType); cJSON* using = cJSON_CreateString(sname); cJSON_AddItemToObject(json, "using", using); + cJSON* tagNumJson = cJSON_CreateNumber(tagNum); + cJSON_AddItemToObject(json, "tagNum", tagNumJson); // cJSON* version = cJSON_CreateNumber(1); // cJSON_AddItemToObject(json, "version", version); @@ -2236,7 +2238,7 @@ static char* processCreateTable(SMqMetaRsp* metaRsp) { pCreateReq = req.pReqs + iReq; if (pCreateReq->type == TSDB_CHILD_TABLE) { string = buildCreateCTableJson((STag*)pCreateReq->ctb.pTag, pCreateReq->ctb.name, pCreateReq->name, - pCreateReq->ctb.tagName, pCreateReq->uid); + pCreateReq->ctb.tagName, pCreateReq->uid, pCreateReq->ctb.tagNum); } else if (pCreateReq->type == TSDB_NORMAL_TABLE) { string = buildCreateTableJson(&pCreateReq->ntb.schemaRow, NULL, pCreateReq->name, pCreateReq->uid, TSDB_NORMAL_TABLE); @@ -2952,9 +2954,243 @@ int32_t taos_write_raw_meta(TAOS *taos, tmq_raw_data raw_meta){ return TSDB_CODE_INVALID_PARA; } -void tmq_free_raw_meta(tmq_raw_data* rawMeta) { - // - taosMemoryFreeClear(rawMeta); +typedef struct{ + SVgroupInfo vg; + void *data; +}VgData; + +static void destroyVgHash(void* data) { + VgData* vgData = (VgData*)data; + taosMemoryFreeClear(vgData->data); +} + +int32_t taos_write_raw_data(TAOS *taos, TAOS_RES *msg){ + if (!TD_RES_TMQ(msg)) { + uError("WriteRaw:msg is not tmq : %d", *(int8_t*)msg); + return TSDB_CODE_TMQ_INVALID_MSG; + } + + int32_t code = TSDB_CODE_SUCCESS; + SHashObj *pVgHash = NULL; + SQuery *pQuery = NULL; + + terrno = TSDB_CODE_SUCCESS; + SRequestObj* pRequest = (SRequestObj*)createRequest(*(int64_t*)taos, TSDB_SQL_INSERT); + if(!pRequest){ + uError("WriteRaw:createRequest error request is null"); + return terrno; + } + + if (!pRequest->pDb) { + uError("WriteRaw:not use db"); + code = TSDB_CODE_PAR_DB_NOT_SPECIFIED; + goto end; + } + + pVgHash = taosHashInit(16, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK); + taosHashSetFreeFp(pVgHash, destroyVgHash); + struct SCatalog *pCatalog = NULL; + code = catalogGetHandle(pRequest->pTscObj->pAppInfo->clusterId, &pCatalog); + if(code != TSDB_CODE_SUCCESS){ + uError("WriteRaw: get gatlog error"); + goto end; + } + + SRequestConnInfo conn = {0}; + conn.pTrans = pRequest->pTscObj->pAppInfo->pTransporter; + conn.requestId = pRequest->requestId; + conn.requestObjRefId = pRequest->self; + conn.mgmtEps = getEpSet_s(&pRequest->pTscObj->pAppInfo->mgmtEp); + SMqRspObj *rspObj = ((SMqRspObj*)msg); + printf("raw data block num:%d\n", rspObj->rsp.blockNum); + while (++rspObj->resIter < rspObj->rsp.blockNum) { + SRetrieveTableRsp* pRetrieve = (SRetrieveTableRsp*)taosArrayGetP(rspObj->rsp.blockData, rspObj->resIter); + if (!rspObj->rsp.withSchema) { + uError("WriteRaw:no schema, iter:%d", rspObj->resIter); + goto end; + } + SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj->rsp.blockSchema, rspObj->resIter); + setResSchemaInfo(&rspObj->resInfo, pSW->pSchema, pSW->nCols); + + code = setQueryResultFromRsp(&rspObj->resInfo, pRetrieve, false, false); + if(code != TSDB_CODE_SUCCESS){ + uError("WriteRaw: setQueryResultFromRsp error"); + goto end; + } + + uint16_t fLen = 0; + int32_t rowSize = 0; + int16_t nVar = 0; + for (int i = 0; i < pSW->nCols; i++) { + SSchema *schema = pSW->pSchema + i; + fLen += TYPE_BYTES[schema->type]; + rowSize += schema->bytes; + if(IS_VAR_DATA_TYPE(schema->type)){ + nVar ++; + } + } + + int32_t rows = rspObj->resInfo.numOfRows; + int32_t extendedRowSize = rowSize + TD_ROW_HEAD_LEN - sizeof(TSKEY) + nVar * sizeof(VarDataOffsetT) + + (int32_t)TD_BITMAP_BYTES(pSW->nCols - 1); + int32_t schemaLen = 0; + int32_t submitLen = sizeof(SSubmitBlk) + schemaLen + rows * extendedRowSize; + + const char* tbName = tmq_get_table_name(msg); + if(!tbName){ + uError("WriteRaw: tbname is null"); + code = TSDB_CODE_TMQ_INVALID_MSG; + goto end; + } + + printf("raw data tbname:%s\n", tbName); + SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; + strcpy(pName.dbname, pRequest->pDb); + strcpy(pName.tname, tbName); + + VgData vgData = {0}; + code = catalogGetTableHashVgroup(pCatalog, &conn, &pName, &(vgData.vg)); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableHashVgroup failed. table name: %s", tbName); + goto end; + } + + SSubmitReq* subReq = NULL; + SSubmitBlk* blk = NULL; + void *hData = taosHashGet(pVgHash, &vgData.vg.vgId, sizeof(vgData.vg.vgId)); + if(hData){ + vgData = *(VgData*)hData; + + int32_t totalLen = ((SSubmitReq*)(vgData.data))->length + submitLen; + void *tmp = taosMemoryRealloc(vgData.data, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + ((VgData*)hData)->data = tmp; + subReq = (SSubmitReq*)(vgData.data); + blk = POINTER_SHIFT(vgData.data, subReq->length); + }else{ + int32_t totalLen = sizeof(SSubmitReq) + submitLen; + void *tmp = taosMemoryCalloc(1, totalLen); + if (tmp == NULL) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + vgData.data = tmp; + taosHashPut(pVgHash, (const char *)&vgData.vg.vgId, sizeof(vgData.vg.vgId), (char *)&vgData, sizeof(vgData)); + subReq = (SSubmitReq*)(vgData.data); + subReq->length = sizeof(SSubmitReq); + subReq->numOfBlocks = 0; + + blk = POINTER_SHIFT(vgData.data, sizeof(SSubmitReq)); + } + + STableMeta* pTableMeta = NULL; + code = catalogGetTableMeta(pCatalog, &conn, &pName, &pTableMeta); + if (code != TSDB_CODE_SUCCESS) { + uError("WriteRaw:catalogGetTableMeta failed. table name: %s", tbName); + goto end; + } + uint64_t suid = (TSDB_NORMAL_TABLE == pTableMeta->tableType ? 0 : pTableMeta->suid); + uint64_t uid = pTableMeta->uid; + taosMemoryFreeClear(pTableMeta); + + void* blkSchema = POINTER_SHIFT(blk, sizeof(SSubmitBlk)); + STSRow* rowData = POINTER_SHIFT(blkSchema, schemaLen); + + SRowBuilder rb = {0}; + tdSRowInit(&rb, pSW->version); + tdSRowSetTpInfo(&rb, pSW->nCols, fLen); + int32_t dataLen = 0; + + for (int32_t j = 0; j < rows; j++) { + tdSRowResetBuf(&rb, rowData); + + doSetOneRowPtr(&rspObj->resInfo); + rspObj->resInfo.current += 1; + + int32_t offset = 0; + for (int32_t k = 0; k < pSW->nCols; k++) { + const SSchema* pColumn = &pSW->pSchema[k]; + char *data = rspObj->resInfo.row[k]; + if (!data) { + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NULL, NULL, false, offset, k); + } else { + if(IS_VAR_DATA_TYPE(pColumn->type)){ + data -= VARSTR_HEADER_SIZE; + } + tdAppendColValToRow(&rb, pColumn->colId, pColumn->type, TD_VTYPE_NORM, data, true, offset, k); + } + offset += TYPE_BYTES[pColumn->type]; + } + int32_t rowLen = TD_ROW_LEN(rowData); + rowData = POINTER_SHIFT(rowData, rowLen); + dataLen += rowLen; + } + + blk->uid = htobe64(uid); + blk->suid = htobe64(suid); + blk->padding = htonl(blk->padding); + blk->sversion = htonl(pSW->version); + blk->schemaLen = htonl(schemaLen); + blk->numOfRows = htons(rows); + blk->dataLen = htonl(dataLen); + subReq->length += sizeof(SSubmitBlk) + schemaLen + dataLen; + subReq->numOfBlocks++; + } + + pQuery = (SQuery*)nodesMakeNode(QUERY_NODE_QUERY); + if (NULL == pQuery) { + uError("create SQuery error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + pQuery->execMode = QUERY_EXEC_MODE_SCHEDULE; + pQuery->haveResultSet = false; + pQuery->msgType = TDMT_VND_SUBMIT; + pQuery->pRoot = (SNode *)nodesMakeNode(QUERY_NODE_VNODE_MODIF_STMT); + if (NULL == pQuery->pRoot) { + uError("create pQuery->pRoot error"); + code = TSDB_CODE_OUT_OF_MEMORY; + goto end; + } + SVnodeModifOpStmt *nodeStmt = (SVnodeModifOpStmt *)(pQuery->pRoot); + nodeStmt->payloadType = PAYLOAD_TYPE_KV; + + int32_t numOfVg = taosHashGetSize(pVgHash); + nodeStmt->pDataBlocks = taosArrayInit(numOfVg, POINTER_BYTES); + + VgData *vData = (VgData *)taosHashIterate(pVgHash, NULL); + while (vData) { + SVgDataBlocks *dst = taosMemoryCalloc(1, sizeof(SVgDataBlocks)); + if (NULL == dst) { + code = TSDB_CODE_TSC_OUT_OF_MEMORY; + goto end; + } + dst->vg = vData->vg; + SSubmitReq* subReq = (SSubmitReq*)(vData->data); + dst->numOfTables = subReq->numOfBlocks; + dst->size = subReq->length; + dst->pData = (char*)subReq; + vData->data = NULL; // no need free + subReq->header.vgId = htonl(dst->vg.vgId); + subReq->version = htonl(1); + subReq->header.contLen = htonl(subReq->length); + subReq->length = htonl(subReq->length); + subReq->numOfBlocks = htonl(subReq->numOfBlocks); + taosArrayPush(nodeStmt->pDataBlocks, &dst); + vData = (VgData *)taosHashIterate(pVgHash, vData); + } + + launchQueryImpl(pRequest, pQuery, true, NULL); + code = pRequest->code; +end: + qDestroyQuery(pQuery); + destroyRequest(pRequest); + taosHashCleanup(pVgHash); + return code; } void tmq_commit_async(tmq_t* tmq, const TAOS_RES* msg, tmq_commit_cb* cb, void* param) { diff --git a/source/common/src/tmsg.c b/source/common/src/tmsg.c index 2a3452c534..c94c624f89 100644 --- a/source/common/src/tmsg.c +++ b/source/common/src/tmsg.c @@ -4981,6 +4981,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { if (pReq->type == TSDB_CHILD_TABLE) { if (tEncodeCStr(pCoder, pReq->ctb.name) < 0) return -1; + if (tEncodeU8(pCoder, pReq->ctb.tagNum) < 0) return -1; if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1; int32_t len = taosArrayGetSize(pReq->ctb.tagName); @@ -5017,6 +5018,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { if (pReq->type == TSDB_CHILD_TABLE) { if (tDecodeCStr(pCoder, &pReq->ctb.name) < 0) return -1; + if (tDecodeU8(pCoder, &pReq->ctb.tagNum) < 0) return -1; if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1; if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1; int32_t len = 0; diff --git a/source/dnode/vnode/src/tq/tqMeta.c b/source/dnode/vnode/src/tq/tqMeta.c index d282036fe3..ab1089e7a4 100644 --- a/source/dnode/vnode/src/tq/tqMeta.c +++ b/source/dnode/vnode/src/tq/tqMeta.c @@ -67,10 +67,10 @@ int32_t tqMetaOpen(STQ* pTq) { ASSERT(0); } - void* pKey; - int kLen; - void* pVal; - int vLen; + void* pKey = NULL; + int kLen = 0; + void* pVal = NULL; + int vLen = 0; tdbTbcMoveToFirst(pCur); SDecoder decoder; diff --git a/source/libs/parser/src/parInsert.c b/source/libs/parser/src/parInsert.c index 8fa8fd4cf8..702422e022 100644 --- a/source/libs/parser/src/parInsert.c +++ b/source/libs/parser/src/parInsert.c @@ -739,12 +739,12 @@ static int32_t parseBoundColumns(SInsertParseContext* pCxt, SParsedDataColInfo* return TSDB_CODE_SUCCESS; } -static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, - SArray* tagName) { +static void buildCreateTbReq(SVCreateTbReq* pTbReq, const char* tname, STag* pTag, int64_t suid, const char* sname, SArray* tagName, uint8_t tagNum) { pTbReq->type = TD_CHILD_TABLE; pTbReq->name = strdup(tname); pTbReq->ctb.suid = suid; - if (sname) pTbReq->ctb.name = strdup(sname); + pTbReq->ctb.tagNum = tagNum; + if(sname) pTbReq->ctb.name = strdup(sname); pTbReq->ctb.pTag = (uint8_t*)pTag; pTbReq->ctb.tagName = taosArrayDup(tagName); pTbReq->commentLen = -1; @@ -903,7 +903,7 @@ static int32_t parseTagToken(char** end, SToken* pToken, SSchema* pSchema, int16 if (pToken->n + VARSTR_HEADER_SIZE > pSchema->bytes) { return generateSyntaxErrMsg(pMsgBuf, TSDB_CODE_PAR_VALUE_TOO_LONG, pSchema->name); } - val->pData = pToken->z; + val->pData = strdup(pToken->z); val->nData = pToken->n; break; } @@ -969,10 +969,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint } SSchema* pTagSchema = &pSchema[pCxt->tags.boundColumns[i]]; - char* tmpTokenBuf = taosMemoryCalloc(1, sToken.n); // todo this can be optimize with parse column + char tmpTokenBuf[TSDB_MAX_BYTES_PER_ROW] = {0}; // todo this can be optimize with parse column code = checkAndTrimValue(&sToken, tmpTokenBuf, &pCxt->msg); if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(tmpTokenBuf); goto end; } @@ -982,7 +981,6 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint if (pTagSchema->type == TSDB_DATA_TYPE_JSON) { if (sToken.n > (TSDB_MAX_JSON_TAG_LEN - VARSTR_HEADER_SIZE) / TSDB_NCHAR_SIZE) { code = buildSyntaxErrMsg(&pCxt->msg, "json string too long than 4095", sToken.z); - taosMemoryFree(tmpTokenBuf); goto end; } if (isNullValue(pTagSchema->type, &sToken)) { @@ -990,7 +988,6 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint } else { code = parseJsontoTagData(sToken.z, pTagVals, &pTag, &pCxt->msg); } - taosMemoryFree(tmpTokenBuf); if (code != TSDB_CODE_SUCCESS) { goto end; } @@ -999,12 +996,9 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint STagVal val = {0}; code = parseTagToken(&pCxt->pSql, &sToken, pTagSchema, precision, &val, &pCxt->msg); if (TSDB_CODE_SUCCESS != code) { - taosMemoryFree(tmpTokenBuf); goto end; } - if (pTagSchema->type != TSDB_DATA_TYPE_BINARY) { - taosMemoryFree(tmpTokenBuf); - } + taosArrayPush(pTagVals, &val); } } @@ -1018,12 +1012,12 @@ static int32_t parseTagsClause(SInsertParseContext* pCxt, SSchema* pSchema, uint goto end; } - buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName); + buildCreateTbReq(&pCxt->createTblReq, tName, pTag, pCxt->pTableMeta->suid, pCxt->sTableName, tagName, pCxt->pTableMeta->tableInfo.numOfTags); end: for (int i = 0; i < taosArrayGetSize(pTagVals); ++i) { STagVal* p = (STagVal*)taosArrayGet(pTagVals, i); - if (p->type == TSDB_DATA_TYPE_NCHAR) { + if (IS_VAR_DATA_TYPE(p->type)) { taosMemoryFree(p->pData); } } @@ -1902,7 +1896,7 @@ int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, const ch } SVCreateTbReq tbReq = {0}; - buildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName); + buildCreateTbReq(&tbReq, tName, pTag, suid, sTableName, tagName, pDataBlock->pTableMeta->tableInfo.numOfTags); code = buildCreateTbMsg(pDataBlock, &tbReq); tdDestroySVCreateTbReq(&tbReq); @@ -2338,7 +2332,7 @@ int32_t smlBindData(void* handle, SArray* tags, SArray* colsSchema, SArray* cols return ret; } - buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName); + buildCreateTbReq(&smlHandle->tableExecHandle.createTblReq, tableName, pTag, pTableMeta->suid, NULL, tagName, pTableMeta->tableInfo.numOfTags); taosArrayDestroy(tagName); smlHandle->tableExecHandle.createTblReq.ctb.name = taosMemoryMalloc(sTableNameLen + 1); diff --git a/source/libs/parser/src/parTranslater.c b/source/libs/parser/src/parTranslater.c index 93fffee956..3930e46054 100644 --- a/source/libs/parser/src/parTranslater.c +++ b/source/libs/parser/src/parTranslater.c @@ -5559,7 +5559,7 @@ static int32_t rewriteCreateTable(STranslateContext* pCxt, SQuery* pQuery) { static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, SCreateSubTableClause* pStmt, const STag* pTag, uint64_t suid, const char* sTableNmae, SVgroupInfo* pVgInfo, - SArray* tagName) { + SArray* tagName, uint8_t tagNum) { // char dbFName[TSDB_DB_FNAME_LEN] = {0}; // SName name = {.type = TSDB_DB_NAME_T, .acctId = acctId}; // strcpy(name.dbname, pStmt->dbName); @@ -5576,6 +5576,7 @@ static void addCreateTbReqIntoVgroup(int32_t acctId, SHashObj* pVgroupHashmap, S req.commentLen = -1; } req.ctb.suid = suid; + req.ctb.tagNum = tagNum; req.ctb.name = strdup(sTableNmae); req.ctb.pTag = (uint8_t*)pTag; req.ctb.tagName = taosArrayDup(tagName); @@ -5827,7 +5828,7 @@ static int32_t rewriteCreateSubTable(STranslateContext* pCxt, SCreateSubTableCla } if (TSDB_CODE_SUCCESS == code) { addCreateTbReqIntoVgroup(pCxt->pParseCxt->acctId, pVgroupHashmap, pStmt, pTag, pSuperTableMeta->uid, - pStmt->useTableName, &info, tagName); + pStmt->useTableName, &info, tagName, pSuperTableMeta->tableInfo.numOfTags); } taosArrayDestroy(tagName);