From 8b7ca94295ab987b3bcacb89bdf8e80d47b174a5 Mon Sep 17 00:00:00 2001 From: Liu Jicong Date: Tue, 8 Feb 2022 10:20:41 +0800 Subject: [PATCH] remove tmqTest out of clientTest --- example/src/tmq.c | 10 +- source/client/src/clientImpl.c | 3 +- source/client/test/CMakeLists.txt | 14 ++- source/client/test/clientTests.cpp | 113 ------------------- source/client/test/tmqTest.cpp | 154 ++++++++++++++++++++++++++ source/dnode/mnode/impl/inc/mndDef.h | 2 +- source/libs/qworker/src/qworker.c | 4 +- source/libs/scheduler/src/scheduler.c | 4 +- 8 files changed, 179 insertions(+), 125 deletions(-) create mode 100644 source/client/test/tmqTest.cpp diff --git a/example/src/tmq.c b/example/src/tmq.c index 31bfe4197f..64b631159b 100644 --- a/example/src/tmq.c +++ b/example/src/tmq.c @@ -135,30 +135,30 @@ void basic_consume_loop(tmq_t *tmq, fprintf(stderr, "%% Consumer closed\n"); } -void sync_consume_loop(tmq_t *rk, +void sync_consume_loop(tmq_t *tmq, tmq_list_t *topics) { static const int MIN_COMMIT_COUNT = 1000; int msg_count = 0; tmq_resp_err_t err; - if ((err = tmq_subscribe(rk, topics))) { + if ((err = tmq_subscribe(tmq, topics))) { fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(err)); return; } while (running) { - tmq_message_t *tmqmessage = tmq_consumer_poll(rk, 500); + tmq_message_t *tmqmessage = tmq_consumer_poll(tmq, 500); if (tmqmessage) { msg_process(tmqmessage); tmq_message_destroy(tmqmessage); if ((++msg_count % MIN_COMMIT_COUNT) == 0) - tmq_commit(rk, NULL, 0); + tmq_commit(tmq, NULL, 0); } } - err = tmq_consumer_close(rk); + err = tmq_consumer_close(tmq); if (err) fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(err)); else diff --git a/source/client/src/clientImpl.c b/source/client/src/clientImpl.c index 8332f18512..e04a9cc81d 100644 --- a/source/client/src/clientImpl.c +++ b/source/client/src/clientImpl.c @@ -105,8 +105,9 @@ TAOS *taos_connect_internal(const char *ip, const char *user, const char *pass, pthread_mutex_lock(&appInfo.mutex); pInst = taosHashGet(appInfo.pInstMap, key, strlen(key)); + SAppInstInfo* p = NULL; if (pInst == NULL) { - SAppInstInfo* p = calloc(1, sizeof(struct SAppInstInfo)); + p = calloc(1, sizeof(struct SAppInstInfo)); p->mgmtEp = epSet; p->pTransporter = openTransporter(user, secretEncrypt, tsNumOfCores); /*p->pAppHbMgr = appHbMgrInit(p, key);*/ diff --git a/source/client/test/CMakeLists.txt b/source/client/test/CMakeLists.txt index 3614e4364b..ee5109860e 100644 --- a/source/client/test/CMakeLists.txt +++ b/source/client/test/CMakeLists.txt @@ -5,14 +5,26 @@ MESSAGE(STATUS "build parser unit test") SET(CMAKE_CXX_STANDARD 11) AUX_SOURCE_DIRECTORY(${CMAKE_CURRENT_SOURCE_DIR} SOURCE_LIST) -ADD_EXECUTABLE(clientTest ${SOURCE_LIST}) +ADD_EXECUTABLE(clientTest clientTests.cpp) TARGET_LINK_LIBRARIES( clientTest PUBLIC os util common transport parser catalog scheduler function gtest taos qcom ) +ADD_EXECUTABLE(tmqTest tmqTest.cpp) +TARGET_LINK_LIBRARIES( + tmqTest + PUBLIC os util common transport parser catalog scheduler function gtest taos qcom +) + TARGET_INCLUDE_DIRECTORIES( clientTest PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/" PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc" ) + +TARGET_INCLUDE_DIRECTORIES( + tmqTest + PUBLIC "${CMAKE_SOURCE_DIR}/include/libs/client/" + PRIVATE "${CMAKE_SOURCE_DIR}/source/libs/client/inc" +) diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index 553dacafdd..1e68faa4f4 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -564,119 +564,6 @@ TEST(testCase, insert_test) { } #endif -TEST(testCase, create_topic_ctb_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - //taos_free_result(pRes); - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == nullptr); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - - taos_free_result(pRes); - - char* sql = "select * from tu"; - pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql)); - taos_free_result(pRes); - taos_close(pConn); -} - -TEST(testCase, create_topic_stb_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - //taos_free_result(pRes); - - TAOS_FIELD* pFields = taos_fetch_fields(pRes); - ASSERT_TRUE(pFields == nullptr); - - int32_t numOfFields = taos_num_fields(pRes); - ASSERT_EQ(numOfFields, 0); - - taos_free_result(pRes); - - char* sql = "select * from st1"; - pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); - taos_free_result(pRes); - taos_close(pConn); -} - -#if 0 -TEST(testCase, tmq_subscribe_ctb_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg1"); - tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); - - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_ctb_topic_1"); - tmq_subscribe(tmq, topic_list); - - while (1) { - tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); - tmq_message_destroy(msg); - //printf("get msg\n"); - //if (msg == NULL) break; - } -} -#endif - -TEST(testCase, tmq_subscribe_stb_Test) { - TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); - assert(pConn != NULL); - - TAOS_RES* pRes = taos_query(pConn, "use abc1"); - if (taos_errno(pRes) != 0) { - printf("error in use db, reason:%s\n", taos_errstr(pRes)); - } - taos_free_result(pRes); - - tmq_conf_t* conf = tmq_conf_new(); - tmq_conf_set(conf, "group.id", "tg2"); - tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); - - tmq_list_t* topic_list = tmq_list_new(); - tmq_list_append(topic_list, "test_stb_topic_1"); - tmq_subscribe(tmq, topic_list); - - int cnt = 1; - while (1) { - tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); - if (msg == NULL) continue; - tmqShowMsg(msg); - if (cnt++ % 10 == 0){ - tmq_commit(tmq, NULL, 0); - } - //tmq_commit(tmq, NULL, 0); - tmq_message_destroy(msg); - //printf("get msg\n"); - } -} - -TEST(testCase, tmq_consume_Test) { -} - -TEST(testCase, tmq_commit_TEST) { -} #if 0 TEST(testCase, projection_query_tables) { diff --git a/source/client/test/tmqTest.cpp b/source/client/test/tmqTest.cpp new file mode 100644 index 0000000000..f767e7faef --- /dev/null +++ b/source/client/test/tmqTest.cpp @@ -0,0 +1,154 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include + +#include +#include +#include + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wwrite-strings" +#pragma GCC diagnostic ignored "-Wunused-function" +#pragma GCC diagnostic ignored "-Wunused-variable" +#pragma GCC diagnostic ignored "-Wsign-compare" + +#include "../inc/clientInt.h" +#include "taos.h" + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} + +TEST(testCase, driverInit_Test) { + taosInitGlobalCfg(); +// taos_init(); +} + +TEST(testCase, create_topic_ctb_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + //taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from tu"; + pRes = tmq_create_topic(pConn, "test_ctb_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} + +TEST(testCase, create_topic_stb_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + //taos_free_result(pRes); + + TAOS_FIELD* pFields = taos_fetch_fields(pRes); + ASSERT_TRUE(pFields == nullptr); + + int32_t numOfFields = taos_num_fields(pRes); + ASSERT_EQ(numOfFields, 0); + + taos_free_result(pRes); + + char* sql = "select * from st1"; + pRes = tmq_create_topic(pConn, "test_stb_topic_1", sql, strlen(sql)); + taos_free_result(pRes); + taos_close(pConn); +} + +#if 0 +TEST(testCase, tmq_subscribe_ctb_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg1"); + tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_ctb_topic_1"); + tmq_subscribe(tmq, topic_list); + + while (1) { + tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); + tmq_message_destroy(msg); + //printf("get msg\n"); + //if (msg == NULL) break; + } +} + +TEST(testCase, tmq_subscribe_stb_Test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + assert(pConn != NULL); + + TAOS_RES* pRes = taos_query(pConn, "use abc1"); + if (taos_errno(pRes) != 0) { + printf("error in use db, reason:%s\n", taos_errstr(pRes)); + } + taos_free_result(pRes); + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "tg2"); + tmq_t* tmq = tmq_consumer_new(pConn, conf, NULL, 0); + + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "test_stb_topic_1"); + tmq_subscribe(tmq, topic_list); + + int cnt = 1; + while (1) { + tmq_message_t* msg = tmq_consumer_poll(tmq, 1000); + if (msg == NULL) continue; + tmqShowMsg(msg); + if (cnt++ % 10 == 0){ + tmq_commit(tmq, NULL, 0); + } + //tmq_commit(tmq, NULL, 0); + tmq_message_destroy(msg); + //printf("get msg\n"); + } +} + +TEST(testCase, tmq_consume_Test) { +} + +TEST(testCase, tmq_commit_Test) { +} + +#endif diff --git a/source/dnode/mnode/impl/inc/mndDef.h b/source/dnode/mnode/impl/inc/mndDef.h index 75f90df658..e8a9a68466 100644 --- a/source/dnode/mnode/impl/inc/mndDef.h +++ b/source/dnode/mnode/impl/inc/mndDef.h @@ -617,7 +617,7 @@ typedef struct SMqTopicObj { int64_t createTime; int64_t updateTime; uint64_t uid; - uint64_t dbUid; + int64_t dbUid; int32_t version; SRWLatch lock; int32_t sqlLen; diff --git a/source/libs/qworker/src/qworker.c b/source/libs/qworker/src/qworker.c index 8705bc23b9..cebe8178d9 100644 --- a/source/libs/qworker/src/qworker.c +++ b/source/libs/qworker/src/qworker.c @@ -561,7 +561,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void QW_ERR_RET(qwUpdateTaskStatus(QW_FPARAMS(), JOB_TASK_STATUS_SUCCEED)); - QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); + QW_ERR_RET(qwMallocFetchRsp(len, &rsp)); *rspMsg = rsp; *dataLen = 0; @@ -573,7 +573,7 @@ int32_t qwGetResFromSink(QW_FPARAMS_DEF, SQWTaskCtx *ctx, int32_t *dataLen, void QW_TASK_DLOG("no res data in sink, need response later, queryEnd:%d", queryEnd); return TSDB_CODE_SUCCESS; - } + } // Got data from sink diff --git a/source/libs/scheduler/src/scheduler.c b/source/libs/scheduler/src/scheduler.c index 25137beed9..662df1896b 100644 --- a/source/libs/scheduler/src/scheduler.c +++ b/source/libs/scheduler/src/scheduler.c @@ -1409,7 +1409,7 @@ int32_t schedulerAsyncExecJob(void *transport, SArray *pNodeList, SQueryDag* pDa } int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { - if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) <= 0) { + if (NULL == pDag || pDag->numOfSubplans <= 0 || taosArrayGetSize(pDag->pSubplans) == 0) { SCH_ERR_RET(TSDB_CODE_QRY_INVALID_INPUT); } @@ -1454,7 +1454,6 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { } SSubQueryMsg* pMsg = calloc(1, msgSize); - memcpy(pMsg->msg, msg, msgLen); pMsg->header.vgId = tInfo.addr.nodeId; @@ -1464,6 +1463,7 @@ int32_t schedulerConvertDagToTaskList(SQueryDag* pDag, SArray **pTasks) { pMsg->taskType = TASK_TYPE_PERSISTENT; pMsg->phyLen = msgLen; pMsg->sqlLen = 0; + memcpy(pMsg->msg, msg, msgLen); /*memcpy(pMsg->msg, ((SSubQueryMsg*)msg)->msg, msgLen);*/ tInfo.msg = pMsg;