refactor: do some internal refactor and fix race condition.

This commit is contained in:
Haojun Liao 2023-02-17 23:06:19 +08:00
parent e5d4d9daba
commit 7d915626c4
2 changed files with 192 additions and 90 deletions

View File

@ -32,15 +32,15 @@
sem_post(x)
#endif
int32_t tmqAskEp(tmq_t* tmq, bool async);
typedef struct {
struct SMqMgmt{
int8_t inited;
tmr_h timer;
int32_t rsetId;
} SMqMgmt;
};
static SMqMgmt tmqMgmt = {0};
static TdThreadOnce tmqInit = PTHREAD_ONCE_INIT; // initialize only once
volatile int32_t tmqInitRes = 0; // initialize rsp code
static struct SMqMgmt tmqMgmt = {0};
typedef struct {
int8_t tmqRspType;
@ -65,8 +65,7 @@ struct tmq_conf_t {
int8_t withTbName;
int8_t snapEnable;
int32_t snapBatchSize;
bool hbBgEnable;
bool hbBgEnable;
uint16_t port;
int32_t autoCommitInterval;
@ -78,18 +77,17 @@ struct tmq_conf_t {
};
struct tmq_t {
int64_t refId;
int64_t refId;
// conf
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
int64_t consumerId;
bool hbBgEnable;
char groupId[TSDB_CGROUP_LEN];
char clientId[256];
int8_t withTbName;
int8_t useSnapshot;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
uint64_t consumerId;
bool hbBgEnable;
tmq_commit_cb* commitCb;
void* commitCbUserParam;
@ -221,13 +219,21 @@ typedef struct {
/*int32_t vgId;*/
} SMqCommitCbParam;
static int32_t tmqAskEp(tmq_t* tmq, bool async);
tmq_conf_t* tmq_conf_new() {
tmq_conf_t* conf = taosMemoryCalloc(1, sizeof(tmq_conf_t));
if (conf == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return conf;
}
conf->withTbName = false;
conf->autoCommit = true;
conf->autoCommitInterval = 5000;
conf->resetOffset = TMQ_CONF__RESET_OFFSET__EARLIEAST;
conf->hbBgEnable = true;
return conf;
}
@ -932,31 +938,37 @@ void tmqFreeImpl(void* handle) {
taosMemoryFree(tmq);
}
static void tmqMgmtInit(void) {
tmqInitRes = 0;
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
if (tmqMgmt.timer == NULL) {
tmqInitRes = TSDB_CODE_OUT_OF_MEMORY;
}
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
if (tmqMgmt.rsetId != 0) {
tmqInitRes = terrno;
}
}
tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init timer
int8_t inited = atomic_val_compare_exchange_8(&tmqMgmt.inited, 0, 1);
if (inited == 0) {
tmqMgmt.timer = taosTmrInit(1000, 100, 360000, "TMQ");
if (tmqMgmt.timer == NULL) {
atomic_store_8(&tmqMgmt.inited, 0);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
tmqMgmt.rsetId = taosOpenRef(10000, tmqFreeImpl);
taosThreadOnce(&tmqInit, tmqMgmtInit);
if (tmqInitRes != 0) {
terrno = tmqInitRes;
return NULL;
}
tmq_t* pTmq = taosMemoryCalloc(1, sizeof(tmq_t));
if (pTmq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("setting up new consumer failed since %s, consumer group %s", terrstr(), conf->groupId);
tscError("failed to create consumer, consumer group %s, code:%s", conf->groupId, terrstr());
return NULL;
}
const char* user = conf->user == NULL ? TSDB_DEFAULT_USER : conf->user;
const char* pass = conf->pass == NULL ? TSDB_DEFAULT_PASS : conf->pass;
ASSERT(user);
ASSERT(pass);
ASSERT(conf->groupId[0]);
pTmq->clientTopics = taosArrayInit(0, sizeof(SMqClientTopic));
@ -966,7 +978,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
if (pTmq->clientTopics == NULL || pTmq->mqueue == NULL || pTmq->qall == NULL || pTmq->delayedTask == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
tscError("consumer:0x%" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
goto FAIL;
}
@ -996,7 +1008,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init semaphore
if (tsem_init(&pTmq->rspSem, 0, 0) != 0) {
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
goto FAIL;
}
@ -1004,7 +1016,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
// init connection
pTmq->pTscObj = taos_connect_internal(conf->ip, user, pass, NULL, NULL, conf->port, CONN_TYPE__TMQ);
if (pTmq->pTscObj == NULL) {
tscError("consumer %" PRId64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
tscError("consumer:0x %" PRIx64 " setup failed since %s, consumer group %s", pTmq->consumerId, terrstr(),
pTmq->groupId);
tsem_destroy(&pTmq->rspSem);
goto FAIL;
@ -1022,8 +1034,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->hbLiveTimer = taosTmrStart(tmqSendHbReq, 1000, pRefId, tmqMgmt.timer);
}
tscInfo("consumer %" PRId64 " is setup, consumer group %s", pTmq->consumerId, pTmq->groupId);
tscInfo("consumer:0x%" PRIx64 " is setup, consumer groupId %s", pTmq->consumerId, pTmq->groupId);
return pTmq;
FAIL:
@ -1032,6 +1043,7 @@ FAIL:
if (pTmq->delayedTask) taosCloseQueue(pTmq->delayedTask);
if (pTmq->qall) taosFreeQall(pTmq->qall);
taosMemoryFree(pTmq);
return NULL;
}
@ -1041,44 +1053,52 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
void* buf = NULL;
SMsgSendInfo* sendInfo = NULL;
SCMSubscribeReq req = {0};
int32_t code = -1;
int32_t code = 0;
tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
tscDebug("consumer:0x%"PRIx64", tmq subscribe start, numOfTopic %d", tmq->consumerId, sz);
req.consumerId = tmq->consumerId;
tstrncpy(req.clientId, tmq->clientId, 256);
tstrncpy(req.cgroup, tmq->groupId, TSDB_CGROUP_LEN);
req.topicNames = taosArrayInit(sz, sizeof(void*));
if (req.topicNames == NULL) goto FAIL;
tscDebug("tmq subscribe, consumer: %" PRId64 ", topic num %d", tmq->consumerId, sz);
if (req.topicNames == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
for (int32_t i = 0; i < sz; i++) {
char* topic = taosArrayGetP(container, i);
SName name = {0};
tNameSetDbName(&name, tmq->pTscObj->acctId, topic, strlen(topic));
char* topicFName = taosMemoryCalloc(1, TSDB_TOPIC_FNAME_LEN);
if (topicFName == NULL) {
goto FAIL;
}
tNameExtractFullName(&name, topicFName);
tscDebug("subscribe topic: %s", topicFName);
tNameExtractFullName(&name, topicFName);
tscDebug("consumer:0x%"PRIx64", subscribe topic: %s", tmq->consumerId, topicFName);
taosArrayPush(req.topicNames, &topicFName);
}
int32_t tlen = tSerializeSCMSubscribeReq(NULL, &req);
buf = taosMemoryMalloc(tlen);
if (buf == NULL) goto FAIL;
if (buf == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
void* abuf = buf;
tSerializeSCMSubscribeReq(&abuf, &req);
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) goto FAIL;
if (sendInfo == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto FAIL;
}
SMqSubscribeCbParam param = {
.rspErr = 0,
@ -1086,7 +1106,9 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
.epoch = tmq->epoch,
};
if (tsem_init(&param.rspSem, 0, 0) != 0) goto FAIL;
if (tsem_init(&param.rspSem, 0, 0) != 0) {
goto FAIL;
}
sendInfo->msgInfo = (SDataBuf){
.pData = buf,
@ -1112,15 +1134,18 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
tsem_wait(&param.rspSem);
tsem_destroy(&param.rspSem);
code = param.rspErr;
if (code != 0) goto FAIL;
if (param.rspErr != 0) {
code = param.rspErr;
goto FAIL;
}
int32_t retryCnt = 0;
while (TSDB_CODE_MND_CONSUMER_NOT_READY == tmqAskEp(tmq, false)) {
if (retryCnt++ > 10) {
goto FAIL;
}
tscDebug("consumer not ready, retry");
tscDebug("consumer:0x%"PRIx64", mnd not ready for subscribe, retry count:%d in 500ms", tmq->consumerId, retryCnt);
taosMsleep(500);
}
@ -1138,7 +1163,6 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
tmq->commitTimer = taosTmrStart(tmqAssignDelayedCommitTask, tmq->autoCommitInterval, pRefId2, tmqMgmt.timer);
}
code = 0;
FAIL:
taosArrayDestroyP(req.topicNames, taosMemoryFree);
taosMemoryFree(buf);
@ -1434,7 +1458,7 @@ END:
}
int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t code = 0;
int32_t code = TSDB_CODE_SUCCESS;
#if 0
int8_t epStatus = atomic_val_compare_exchange_8(&tmq->epStatus, 0, 1);
if (epStatus == 1) {
@ -1444,6 +1468,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
}
atomic_store_32(&tmq->epSkipCnt, 0);
#endif
SMqAskEpReq req = {0};
req.consumerId = tmq->consumerId;
req.epoch = tmq->epoch;
@ -1451,27 +1476,31 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tscError("tSerializeSMqAskEpReq failed");
tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq failed", tmq->consumerId);
return -1;
}
void* pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tscError("failed to malloc askEpReq msg, size:%d", tlen);
tscError("consumer:0x%"PRIx64", failed to malloc askEpReq msg, size:%d", tmq->consumerId, tlen);
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tscError("tSerializeSMqAskEpReq %d failed", tlen);
tscError("consumer:0x%"PRIx64", tSerializeSMqAskEpReq %d failed", tmq->consumerId, tlen);
taosMemoryFree(pReq);
return -1;
}
SMqAskEpCbParam* pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tscError("failed to malloc subscribe param");
tscError("consumer:0x%"PRIx64", failed to malloc subscribe param", tmq->consumerId);
taosMemoryFree(pReq);
/*atomic_store_8(&tmq->epStatus, 0);*/
return -1;
}
pParam->refId = tmq->refId;
pParam->epoch = tmq->epoch;
pParam->async = async;
@ -1499,8 +1528,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
SEpSet epSet = getEpSet_s(&tmq->pTscObj->pAppInfo->mgmtEp);
tscDebug("consumer:%" PRId64 ", ask ep", tmq->consumerId);
tscDebug("consumer:0x%" PRIx64 ", ask ep from mnode", tmq->consumerId);
int64_t transporterId = 0;
asyncSendMsgToServer(tmq->pTscObj->pAppInfo->pTransporter, &epSet, &transporterId, sendInfo);
@ -1510,6 +1538,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool async) {
code = pParam->code;
taosMemoryFree(pParam);
}
return code;
}

View File

@ -162,6 +162,11 @@ void *queryThread(void *arg) {
}
static int32_t numOfThreads = 1;
void tmq_commit_cb_print(tmq_t *pTmq, int32_t code, void *param) {
printf("success, code:%d\n", code);
}
} // namespace
int main(int argc, char** argv) {
@ -176,12 +181,12 @@ int main(int argc, char** argv) {
return RUN_ALL_TESTS();
}
TEST(testCase, driverInit_Test) {
TEST(clientCase, driverInit_Test) {
// taosInitGlobalCfg();
// taos_init();
}
TEST(testCase, connect_Test) {
TEST(clientCase, connect_Test) {
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
@ -190,8 +195,8 @@ TEST(testCase, connect_Test) {
}
taos_close(pConn);
}
#if 0
TEST(testCase, create_user_Test) {
TEST(clientCase, create_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -204,7 +209,7 @@ TEST(testCase, create_user_Test) {
taos_close(pConn);
}
TEST(testCase, create_account_Test) {
TEST(clientCase, create_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -217,7 +222,7 @@ TEST(testCase, create_account_Test) {
taos_close(pConn);
}
TEST(testCase, drop_account_Test) {
TEST(clientCase, drop_account_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -230,7 +235,7 @@ TEST(testCase, drop_account_Test) {
taos_close(pConn);
}
TEST(testCase, show_user_Test) {
TEST(clientCase, show_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -250,7 +255,7 @@ TEST(testCase, show_user_Test) {
taos_close(pConn);
}
TEST(testCase, drop_user_Test) {
TEST(clientCase, drop_user_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -263,7 +268,7 @@ TEST(testCase, drop_user_Test) {
taos_close(pConn);
}
TEST(testCase, show_db_Test) {
TEST(clientCase, show_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -282,7 +287,7 @@ TEST(testCase, show_db_Test) {
taos_close(pConn);
}
TEST(testCase, create_db_Test) {
TEST(clientCase, create_db_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -306,7 +311,7 @@ TEST(testCase, create_db_Test) {
taos_close(pConn);
}
TEST(testCase, create_dnode_Test) {
TEST(clientCase, create_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -325,7 +330,7 @@ TEST(testCase, create_dnode_Test) {
taos_close(pConn);
}
TEST(testCase, drop_dnode_Test) {
TEST(clientCase, drop_dnode_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -349,7 +354,7 @@ TEST(testCase, drop_dnode_Test) {
taos_close(pConn);
}
TEST(testCase, use_db_test) {
TEST(clientCase, use_db_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -367,7 +372,7 @@ TEST(testCase, use_db_test) {
taos_close(pConn);
}
// TEST(testCase, drop_db_test) {
// TEST(clientCase, drop_db_test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != NULL);
//
@ -389,7 +394,7 @@ TEST(testCase, use_db_test) {
// taos_close(pConn);
//}
TEST(testCase, create_stable_Test) {
TEST(clientCase, create_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -428,7 +433,7 @@ TEST(testCase, create_stable_Test) {
taos_close(pConn);
}
TEST(testCase, create_table_Test) {
TEST(clientCase, create_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -447,7 +452,7 @@ TEST(testCase, create_table_Test) {
taos_close(pConn);
}
TEST(testCase, create_ctable_Test) {
TEST(clientCase, create_ctable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -472,7 +477,7 @@ TEST(testCase, create_ctable_Test) {
taos_close(pConn);
}
TEST(testCase, show_stable_Test) {
TEST(clientCase, show_stable_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != nullptr);
@ -497,7 +502,7 @@ TEST(testCase, show_stable_Test) {
taos_close(pConn);
}
TEST(testCase, show_vgroup_Test) {
TEST(clientCase, show_vgroup_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -529,7 +534,7 @@ TEST(testCase, show_vgroup_Test) {
taos_close(pConn);
}
TEST(testCase, create_multiple_tables) {
TEST(clientCase, create_multiple_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -600,7 +605,7 @@ TEST(testCase, create_multiple_tables) {
taos_close(pConn);
}
TEST(testCase, show_table_Test) {
TEST(clientCase, show_table_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
assert(pConn != NULL);
@ -634,7 +639,7 @@ TEST(testCase, show_table_Test) {
taos_close(pConn);
}
//TEST(testCase, drop_stable_Test) {
//TEST(clientCase, drop_stable_Test) {
// TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
// assert(pConn != nullptr);
//
@ -659,7 +664,7 @@ TEST(testCase, show_table_Test) {
// taos_close(pConn);
//}
TEST(testCase, generated_request_id_test) {
TEST(clientCase, generated_request_id_test) {
SHashObj* phash = taosHashInit(10000, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), false, HASH_ENTRY_LOCK);
for (int32_t i = 0; i < 50000; ++i) {
@ -675,7 +680,7 @@ TEST(testCase, generated_request_id_test) {
taosHashCleanup(phash);
}
TEST(testCase, insert_test) {
TEST(clientCase, insert_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -692,9 +697,8 @@ TEST(testCase, insert_test) {
taos_free_result(pRes);
taos_close(pConn);
}
#endif
TEST(testCase, projection_query_tables) {
TEST(clientCase, projection_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -752,8 +756,7 @@ TEST(testCase, projection_query_tables) {
taos_close(pConn);
}
#if 0
TEST(testCase, tsbs_perf_test) {
TEST(clientCase, tsbs_perf_test) {
TdThread qid[20] = {0};
for(int32_t i = 0; i < numOfThreads; ++i) {
@ -762,7 +765,7 @@ TEST(testCase, tsbs_perf_test) {
getchar();
}
TEST(testCase, projection_query_stables) {
TEST(clientCase, projection_query_stables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -790,7 +793,7 @@ TEST(testCase, projection_query_stables) {
taos_close(pConn);
}
TEST(testCase, agg_query_tables) {
TEST(clientCase, agg_query_tables) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -825,7 +828,7 @@ create table tm1 using m1 tags(2);
insert into tm0 values('2021-1-1 1:1:1.120', 1) ('2021-1-1 1:1:2.9', 2) tm1 values('2021-1-1 1:1:1.120', 11) ('2021-1-1 1:1:2.99', 22);
*/
TEST(testCase, async_api_test) {
TEST(clientCase, async_api_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -859,7 +862,7 @@ TEST(testCase, async_api_test) {
taos_close(pConn);
}
TEST(testCase, update_test) {
TEST(clientCase, update_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
@ -895,6 +898,76 @@ TEST(testCase, update_test) {
}
}
#endif
TEST(clientCase, subscription_test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT_NE(pConn, nullptr);
// TAOS_RES* pRes = taos_query(pConn, "create topic topic_t1 as select * from t1");
// if (taos_errno(pRes) != TSDB_CODE_SUCCESS) {
// printf("failed to create topic, code:%s", taos_errstr(pRes));
// taos_free_result(pRes);
// return;
// }
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.commit.interval.ms", "1000");
tmq_conf_set(conf, "group.id", "cgrpName");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "experimental.snapshot.enable", "true");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
tmq_conf_destroy(conf);
// 创建订阅 topics 列表
tmq_list_t* topicList = tmq_list_new();
tmq_list_append(topicList, "topic_t1");
// 启动订阅
tmq_subscribe(tmq, topicList);
tmq_list_destroy(topicList);
TAOS_FIELD* fields = NULL;
int32_t numOfFields = 0;
int32_t precision = 0;
int32_t totalRows = 0;
int32_t msgCnt = 0;
int32_t timeout = 5000;
while (1) {
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
if (pRes) {
char buf[1024];
int32_t rows = 0;
const char* topicName = tmq_get_topic_name(pRes);
const char* dbName = tmq_get_db_name(pRes);
int32_t vgroupId = tmq_get_vgroup_id(pRes);
printf("topic: %s\n", topicName);
printf("db: %s\n", dbName);
printf("vgroup id: %d\n", vgroupId);
while (1) {
TAOS_ROW row = taos_fetch_row(pRes);
if (row == NULL) break;
fields = taos_fetch_fields(pRes);
numOfFields = taos_field_count(pRes);
precision = taos_result_precision(pRes);
rows++;
taos_print_row(buf, row, fields, numOfFields);
printf("precision: %d, row content: %s\n", precision, buf);
}
}
// return rows;
}
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
}
#pragma GCC diagnostic pop