From 4436eb7e0f639c12b6619ccbe9b23da94cd6e55a Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Fri, 7 Jul 2023 19:12:18 +0800 Subject: [PATCH] fix:set commitOffset is send msg success & optimize log & add test cases for TD-25129 --- source/client/src/clientTmq.c | 4 +- source/client/test/clientTests.cpp | 140 +++++++++++++++++++++++++++++ source/dnode/vnode/src/tq/tq.c | 4 +- 3 files changed, 145 insertions(+), 3 deletions(-) diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 3fbde36e89..5fd79a2711 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -678,6 +678,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm taosMemoryFree(pParamSet); pCommitFp(tmq, code, userParam); } + // update the offset value. + pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset; } else { // do not perform commit, callback user function directly. taosMemoryFree(pParamSet); pCommitFp(tmq, code, userParam); @@ -2712,7 +2714,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a // char offsetBuf[TSDB_OFFSET_LEN] = {0}; // tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset); - tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset); + tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset); pOffsetInfo->walVerBegin = p->begin; pOffsetInfo->walVerEnd = p->end; diff --git a/source/client/test/clientTests.cpp b/source/client/test/clientTests.cpp index ccc17289b0..3c46d17802 100644 --- a/source/client/test/clientTests.cpp +++ b/source/client/test/clientTests.cpp @@ -1073,6 +1073,146 @@ TEST(clientCase, sub_db_test) { fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); } +TEST(clientCase, td_25129) { +// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); + + TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0); + ASSERT_NE(pConn, nullptr); + + tmq_conf_t* conf = tmq_conf_new(); + + tmq_conf_set(conf, "enable.auto.commit", "false"); + tmq_conf_set(conf, "auto.commit.interval.ms", "2000"); + tmq_conf_set(conf, "group.id", "group_id_2"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + tmq_conf_destroy(conf); + + // 创建订阅 topics 列表 + tmq_list_t* topicList = tmq_list_new(); + tmq_list_append(topicList, "tp"); + + // 启动订阅 + tmq_subscribe(tmq, topicList); + tmq_list_destroy(topicList); + + TAOS_FIELD* fields = NULL; + int32_t numOfFields = 0; + int32_t precision = 0; + int32_t totalRows = 0; + int32_t msgCnt = 0; + int32_t timeout = 2000; + + int32_t count = 0; + + tmq_topic_assignment* pAssign = NULL; + int32_t numOfAssign = 0; + + int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4); + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + while (1) { + TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout); + if (pRes) { + char buf[128]; + + const char* topicName = tmq_get_topic_name(pRes); +// const char* dbName = tmq_get_db_name(pRes); +// int32_t vgroupId = tmq_get_vgroup_id(pRes); +// +// printf("topic: %s\n", topicName); +// printf("db: %s\n", dbName); +// printf("vgroup id: %d\n", vgroupId); + + printSubResults(pRes, &totalRows); + } else { + tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset); + tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset); + continue; + } + +// tmq_commit_sync(tmq, pRes); + if (pRes != NULL) { + taos_free_result(pRes); + // if ((++count) > 1) { + // break; + // } + } else { + break; + } + +// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin); + } + + tmq_free_assignment(pAssign); + + code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign); + if (code != 0) { + printf("error occurs:%s\n", tmq_err2str(code)); + tmq_free_assignment(pAssign); + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); + return; + } + + for(int i = 0; i < numOfAssign; i++){ + printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end); + } + + tmq_consumer_close(tmq); + taos_close(pConn); + fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows); +} + TEST(clientCase, sub_tb_test) { taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg"); diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 2d757fff08..c310ca4d40 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -490,7 +490,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { if (!exec) { tqSetHandleExec(pHandle); // qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId, req.subKey, pHandle); taosWUnLockLatch(&pTq->lock); break; @@ -518,7 +518,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) { code = tqExtractDataForMq(pTq, pHandle, &req, pMsg); tqSetHandleIdle(pHandle); - tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId, + tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId, req.subKey, pHandle); return code; }