From 7d915626c4cfa9f855912b9e5b610328ddb419f2 Mon Sep 17 00:00:00 2001 From: Haojun Liao Date: Fri, 17 Feb 2023 23:06:19 +0800 Subject: [PATCH] refactor: do some internal refactor and fix race condition. --- source/client/src/clientTmq.c | 143 +++++++++++++++++------------ source/client/test/clientTests.cpp | 139 +++++++++++++++++++++------- 2 files changed, 192 insertions(+), 90 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 22739108e2..d33f78d29d 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -32,15 +32,15 @@ sem_post(x) #endif -int32_t tmqAskEp(tmq_t* tmq, bool async); - -typedef struct { +struct SMqMgmt{ int8_t inited; tmr_h timer; int32_t rsetId; -} SMqMgmt; +}; -static SMqMgmt tmqMgmt = {0}; +static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once +volatile int32_t tmqInitRes = 0; // initialize rsp code +static struct SMqMgmt tmqMgmt = {0}; typedef struct { int8_t tmqRspType; @@ -65,8 +65,7 @@ struct tmq_conf_t { int8_t withTbName; int8_t snapEnable; int32_t snapBatchSize; - - bool hbBgEnable; + bool hbBgEnable; uint16_t port; int32_t autoCommitInterval; @@ -78,18 +77,17 @@ struct tmq_conf_t { }; struct tmq_t { - int64_t refId; + int64_t refId; // conf - char groupId[TSDB_CGROUP_LEN]; - char clientId[256]; - int8_t withTbName; - int8_t useSnapshot; - int8_t autoCommit; - int32_t autoCommitInterval; - int32_t resetOffsetCfg; - int64_t consumerId; - - bool hbBgEnable; + char groupId[TSDB_CGROUP_LEN]; + char clientId[256]; + int8_t withTbName; + int8_t useSnapshot; + int8_t autoCommit; + int32_t autoCommitInterval; + int32_t resetOffsetCfg; + uint64_t consumerId; + bool hbBgEnable; tmq_commit_cb* commitCb; void* commitCbUserParam; @@ -221,13 +219,21 @@ typedef struct { /*int32_t vgId;*/ } SMqCommitCbParam; +static int32_t tmqAskEp(tmq_t* tmq, bool async); + tmq_conf_t* tmq_conf_new() { tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t)); + if (conf == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return conf; + } + conf->withTbName = false; conf->autoCommit = true; conf->autoCommitInterval = 5000; conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST; conf->hbBgEnable = true; + return conf; } @@ -932,31 +938,37 @@ void tmqFreeImpl(void* handle) { taosMemoryFree(tmq); } +static void tmqMgmtInit(void) { + tmqInitRes = 0; + tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ"); + + if (tmqMgmt.timer == NULL) { + tmqInitRes = TSDB_CODE_OUT_OF_MEMORY; + } + + tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl); + if (tmqMgmt.rsetId != 0) { + tmqInitRes = terrno; + } +} + tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { - // init timer - int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1); - if (inited == 0) { - tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ"); - if (tmqMgmt.timer == NULL) { - atomic_store_8(&tmqMgmt.inited, 0); - terrno = TSDB_CODE_OUT_OF_MEMORY; - return NULL; - } - tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl); + taosThreadOnce(&tmqInit, tmqMgmtInit); + if (tmqInitRes != 0) { + terrno = tmqInitRes; + return NULL; } tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t)); if (pTmq == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId); + tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr()); return NULL; } const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user; const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass; - ASSERT(user); - ASSERT(pass); ASSERT(conf->groupId[0]); pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic)); @@ -966,7 +978,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) { terrno = TSDB_CODE_OUT_OF_MEMORY; - tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), + tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); goto FAIL; } @@ -996,7 +1008,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // init semaphore if (tsem_init(&pTmq->rspSem, 0, 0) != 0) { - tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), + tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); goto FAIL; } @@ -1004,7 +1016,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { // init connection pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ); if (pTmq->pTscObj == NULL) { - tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), + tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(), pTmq->groupId); tsem_destroy(&pTmq->rspSem); goto FAIL; @@ -1022,8 +1034,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer); } - tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId); - + tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId); return pTmq; FAIL: @@ -1032,6 +1043,7 @@ FAIL: if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask); if (pTmq->qall) taosFreeQall(pTmq->qall); taosMemoryFree(pTmq); + return NULL; } @@ -1041,44 +1053,52 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { void* buf = NULL; SMsgSendInfo* sendInfo = NULL; SCMSubscribeReq req = {0}; - int32_t code = -1; + int32_t code = 0; - tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); + tscDebug("consumer:0x%"PRIx64", tmq subscribe start, numOfTopic %d", tmq->consumerId, sz); req.consumerId = tmq->consumerId; tstrncpy(req.clientId, tmq->clientId, 256); tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN); req.topicNames = taosArrayInit(sz, sizeof(void*)); - if (req.topicNames == NULL) goto FAIL; - tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz); + if (req.topicNames == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } for (int32_t i = 0; i < sz; i++) { char* topic = taosArrayGetP(container, i); SName name = {0}; tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic)); - char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN); if (topicFName == NULL) { goto FAIL; } - tNameExtractFullName(&name, topicFName); - tscDebug("subscribe topic: %s", topicFName); + tNameExtractFullName(&name, topicFName); + tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName); taosArrayPush(req.topicNames, &topicFName); } int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req); + buf = taosMemoryMalloc(tlen); - if (buf == NULL) goto FAIL; + if (buf == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } void* abuf = buf; tSerializeSCMSubscribeReq(&abuf, &req); sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) goto FAIL; + if (sendInfo == NULL) { + code = TSDB_CODE_OUT_OF_MEMORY; + goto FAIL; + } SMqSubscribeCbParam param = { .rspErr = 0, @@ -1086,7 +1106,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { .epoch = tmq->epoch, }; - if (tsem_init(¶m.rspSem, 0, 0) != 0) goto FAIL; + if (tsem_init(¶m.rspSem, 0, 0) != 0) { + goto FAIL; + } sendInfo->msgInfo = (SDataBuf){ .pData = buf, @@ -1112,15 +1134,18 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tsem_wait(¶m.rspSem); tsem_destroy(¶m.rspSem); - code = param.rspErr; - if (code != 0) goto FAIL; + if (param.rspErr != 0) { + code = param.rspErr; + goto FAIL; + } int32_t retryCnt = 0; while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) { if (retryCnt++ > 10) { goto FAIL; } - tscDebug("consumer not ready, retry"); + + tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry count:%d in 500ms", tmq->consumerId, retryCnt); taosMsleep(500); } @@ -1138,7 +1163,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer); } - code = 0; FAIL: taosArrayDestroyP(req.topicNames, taosMemoryFree); taosMemoryFree(buf); @@ -1434,7 +1458,7 @@ END: } int32_t tmqAskEp(tmq_t* tmq, bool async) { - int32_t code = 0; + int32_t code = TSDB_CODE_SUCCESS; #if 0 int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1); if (epStatus == 1) { @@ -1444,6 +1468,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { } atomic_store_32(&tmq->epSkipCnt, 0); #endif + SMqAskEpReq req = {0}; req.consumerId = tmq->consumerId; req.epoch = tmq->epoch; @@ -1451,27 +1476,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); if (tlen < 0) { - tscError("tSerializeSMqAskEpReq failed"); + tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq failed", tmq->consumerId); return -1; } + void* pReq = taosMemoryCalloc(1, tlen); if (pReq == NULL) { - tscError("failed to malloc askEpReq msg, size:%d", tlen); + tscError("consumer:0x%"PRIx64", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen); + terrno = TSDB_CODE_OUT_OF_MEMORY; return -1; } + if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { - tscError("tSerializeSMqAskEpReq %d failed", tlen); + tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen); taosMemoryFree(pReq); return -1; } SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); if (pParam == NULL) { - tscError("failed to malloc subscribe param"); + tscError("consumer:0x%"PRIx64", failed to malloc subscribe param", tmq->consumerId); taosMemoryFree(pReq); /*atomic_store_8(&tmq->epStatus, 0);*/ return -1; } + pParam->refId = tmq->refId; pParam->epoch = tmq->epoch; pParam->async = async; @@ -1499,8 +1528,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp); - - tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId); + tscDebug("consumer:0x%" PRIx64 ", ask ep from mnode", tmq->consumerId); int64_t transporterId = 0; asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo); @@ -1510,6 +1538,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) { code = pParam->code; taosMemoryFree(pParam); } + return code; } diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index a75411a854..9b777f05c0 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -162,6 +162,11 @@ void *queryThread(void *arg) { } static int32_t numOfThreads = 1; + +void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) { + printf("success, code:%d\n", code); +} + } // namespace int main(int argc, char** argv) { @@ -176,12 +181,12 @@ int main(int argc, char** argv) { return RUN_ALL_TESTS(); } -TEST(testCase, driverInit_Test) { +TEST(clientCase, driverInit_Test) { // taosInitGlobalCfg(); // taos_init(); } -TEST(testCase, connect_Test) { +TEST(clientCase, connect_Test) { taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); @@ -190,8 +195,8 @@ TEST(testCase, connect_Test) { } taos_close(pConn); } -#if 0 -TEST(testCase, create_user_Test) { + +TEST(clientCase, create_user_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -204,7 +209,7 @@ TEST(testCase, create_user_Test) { taos_close(pConn); } -TEST(testCase, create_account_Test) { +TEST(clientCase, create_account_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -217,7 +222,7 @@ TEST(testCase, create_account_Test) { taos_close(pConn); } -TEST(testCase, drop_account_Test) { +TEST(clientCase, drop_account_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -230,7 +235,7 @@ TEST(testCase, drop_account_Test) { taos_close(pConn); } -TEST(testCase, show_user_Test) { +TEST(clientCase, show_user_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -250,7 +255,7 @@ TEST(testCase, show_user_Test) { taos_close(pConn); } -TEST(testCase, drop_user_Test) { +TEST(clientCase, drop_user_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -263,7 +268,7 @@ TEST(testCase, drop_user_Test) { taos_close(pConn); } -TEST(testCase, show_db_Test) { +TEST(clientCase, show_db_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -282,7 +287,7 @@ TEST(testCase, show_db_Test) { taos_close(pConn); } -TEST(testCase, create_db_Test) { +TEST(clientCase, create_db_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -306,7 +311,7 @@ TEST(testCase, create_db_Test) { taos_close(pConn); } -TEST(testCase, create_dnode_Test) { +TEST(clientCase, create_dnode_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -325,7 +330,7 @@ TEST(testCase, create_dnode_Test) { taos_close(pConn); } -TEST(testCase, drop_dnode_Test) { +TEST(clientCase, drop_dnode_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -349,7 +354,7 @@ TEST(testCase, drop_dnode_Test) { taos_close(pConn); } -TEST(testCase, use_db_test) { +TEST(clientCase, use_db_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -367,7 +372,7 @@ TEST(testCase, use_db_test) { taos_close(pConn); } -// TEST(testCase, drop_db_test) { +// TEST(clientCase, drop_db_test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != NULL); // @@ -389,7 +394,7 @@ TEST(testCase, use_db_test) { // taos_close(pConn); //} -TEST(testCase, create_stable_Test) { +TEST(clientCase, create_stable_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -428,7 +433,7 @@ TEST(testCase, create_stable_Test) { taos_close(pConn); } -TEST(testCase, create_table_Test) { +TEST(clientCase, create_table_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -447,7 +452,7 @@ TEST(testCase, create_table_Test) { taos_close(pConn); } -TEST(testCase, create_ctable_Test) { +TEST(clientCase, create_ctable_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -472,7 +477,7 @@ TEST(testCase, create_ctable_Test) { taos_close(pConn); } -TEST(testCase, show_stable_Test) { +TEST(clientCase, show_stable_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != nullptr); @@ -497,7 +502,7 @@ TEST(testCase, show_stable_Test) { taos_close(pConn); } -TEST(testCase, show_vgroup_Test) { +TEST(clientCase, show_vgroup_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -529,7 +534,7 @@ TEST(testCase, show_vgroup_Test) { taos_close(pConn); } -TEST(testCase, create_multiple_tables) { +TEST(clientCase, create_multiple_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -600,7 +605,7 @@ TEST(testCase, create_multiple_tables) { taos_close(pConn); } -TEST(testCase, show_table_Test) { +TEST(clientCase, show_table_Test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); assert(pConn != NULL); @@ -634,7 +639,7 @@ TEST(testCase, show_table_Test) { taos_close(pConn); } -//TEST(testCase, drop_stable_Test) { +//TEST(clientCase, drop_stable_Test) { // TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); // assert(pConn != nullptr); // @@ -659,7 +664,7 @@ TEST(testCase, show_table_Test) { // taos_close(pConn); //} -TEST(testCase, generated_request_id_test) { +TEST(clientCase, generated_request_id_test) { SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK); for (int32_t i = 0; i < 50000; ++i) { @@ -675,7 +680,7 @@ TEST(testCase, generated_request_id_test) { taosHashCleanup(phash); } -TEST(testCase, insert_test) { +TEST(clientCase, insert_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -692,9 +697,8 @@ TEST(testCase, insert_test) { taos_free_result(pRes); taos_close(pConn); } -#endif -TEST(testCase, projection_query_tables) { +TEST(clientCase, projection_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -752,8 +756,7 @@ TEST(testCase, projection_query_tables) { taos_close(pConn); } -#if 0 -TEST(testCase, tsbs_perf_test) { +TEST(clientCase, tsbs_perf_test) { TdThread qid[20] = {0}; for(int32_t i = 0; i < numOfThreads; ++i) { @@ -762,7 +765,7 @@ TEST(testCase, tsbs_perf_test) { getchar(); } -TEST(testCase, projection_query_stables) { +TEST(clientCase, projection_query_stables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -790,7 +793,7 @@ TEST(testCase, projection_query_stables) { taos_close(pConn); } -TEST(testCase, agg_query_tables) { +TEST(clientCase, agg_query_tables) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -825,7 +828,7 @@ create table tm1 using m1 tags(2); insert into tm0 values('2021-1-1 1:1:1.120', 1) ('2021-1-1 1:1:2.9', 2) tm1 values('2021-1-1 1:1:1.120', 11) ('2021-1-1 1:1:2.99', 22); */ -TEST(testCase, async_api_test) { +TEST(clientCase, async_api_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -859,7 +862,7 @@ TEST(testCase, async_api_test) { taos_close(pConn); } -TEST(testCase, update_test) { +TEST(clientCase, update_test) { TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); ASSERT_NE(pConn, nullptr); @@ -895,6 +898,76 @@ TEST(testCase, update_test) { } } -#endif +TEST(clientCase, subscription_test) { + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + // TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1"); + // if (taos_errno(pRes) != TSDB_CODE_SUCCESS) { + // printf("failed to create topic, code:%s", taos_errstr(pRes)); + // taos_free_result(pRes); + // return; + // } + + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); + tmq_conf_set(conf, "group.id", "cgrpName"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "experimental.snapshot.enable", "true"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "topic_t1"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + TAOS_FIELD* fields = NULL; + int32_t numOfFields = 0; + int32_t precision = 0; + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 5000; + + while (1) { + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + char buf[1024]; + int32_t rows = 0; + + const char* topicName = tmq_get_topic_name(pRes); + const char* dbName = tmq_get_db_name(pRes); + int32_t vgroupId = tmq_get_vgroup_id(pRes); + + printf("topic: %s\n", topicName); + printf("db: %s\n", dbName); + printf("vgroup id: %d\n", vgroupId); + + while (1) { + TAOS_ROW row = taos_fetch_row(pRes); + if (row == NULL) break; + + fields = taos_fetch_fields(pRes); + numOfFields = taos_field_count(pRes); + precision = taos_result_precision(pRes); + rows++; + taos_print_row(buf, row, fields, numOfFields); + printf("precision: %d, row content: %s\n", precision, buf); + } + } +// return rows; + } + + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} #pragma GCC diagnostic pop