fix:set commitOffset is send msg success & optimize log & add test cases for TD-25129
This commit is contained in:
parent
b7bf4925ee
commit
4436eb7e0f
|
@ -678,6 +678,8 @@ static void asyncCommitOffset(tmq_t* tmq, const TAOS_RES* pRes, int32_t type, tm
|
|||
taosMemoryFree(pParamSet);
|
||||
pCommitFp(tmq, code, userParam);
|
||||
}
|
||||
// update the offset value.
|
||||
pVg->offsetInfo.committedOffset = pVg->offsetInfo.currentOffset;
|
||||
} else { // do not perform commit, callback user function directly.
|
||||
taosMemoryFree(pParamSet);
|
||||
pCommitFp(tmq, code, userParam);
|
||||
|
@ -2712,7 +2714,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a
|
|||
// char offsetBuf[TSDB_OFFSET_LEN] = {0};
|
||||
// tFormatOffset(offsetBuf, tListLen(offsetBuf), &pOffsetInfo->currentOffset);
|
||||
|
||||
tscInfo("vgId:%d offset is old to:%"PRId64, p->vgId, p->currentOffset);
|
||||
tscInfo("vgId:%d offset is update to:%"PRId64, p->vgId, p->currentOffset);
|
||||
|
||||
pOffsetInfo->walVerBegin = p->begin;
|
||||
pOffsetInfo->walVerEnd = p->end;
|
||||
|
|
|
@ -1073,6 +1073,146 @@ TEST(clientCase, sub_db_test) {
|
|||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
|
||||
TEST(clientCase, td_25129) {
|
||||
// taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
||||
|
||||
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
|
||||
ASSERT_NE(pConn, nullptr);
|
||||
|
||||
tmq_conf_t* conf = tmq_conf_new();
|
||||
|
||||
tmq_conf_set(conf, "enable.auto.commit", "false");
|
||||
tmq_conf_set(conf, "auto.commit.interval.ms", "2000");
|
||||
tmq_conf_set(conf, "group.id", "group_id_2");
|
||||
tmq_conf_set(conf, "td.connect.user", "root");
|
||||
tmq_conf_set(conf, "td.connect.pass", "taosdata");
|
||||
tmq_conf_set(conf, "auto.offset.reset", "earliest");
|
||||
tmq_conf_set(conf, "msg.with.table.name", "true");
|
||||
|
||||
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
|
||||
tmq_conf_destroy(conf);
|
||||
|
||||
// 创建订阅 topics 列表
|
||||
tmq_list_t* topicList = tmq_list_new();
|
||||
tmq_list_append(topicList, "tp");
|
||||
|
||||
// 启动订阅
|
||||
tmq_subscribe(tmq, topicList);
|
||||
tmq_list_destroy(topicList);
|
||||
|
||||
TAOS_FIELD* fields = NULL;
|
||||
int32_t numOfFields = 0;
|
||||
int32_t precision = 0;
|
||||
int32_t totalRows = 0;
|
||||
int32_t msgCnt = 0;
|
||||
int32_t timeout = 2000;
|
||||
|
||||
int32_t count = 0;
|
||||
|
||||
tmq_topic_assignment* pAssign = NULL;
|
||||
int32_t numOfAssign = 0;
|
||||
|
||||
int32_t code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, 4);
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
while (1) {
|
||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||
if (pRes) {
|
||||
char buf[128];
|
||||
|
||||
const char* topicName = tmq_get_topic_name(pRes);
|
||||
// const char* dbName = tmq_get_db_name(pRes);
|
||||
// int32_t vgroupId = tmq_get_vgroup_id(pRes);
|
||||
//
|
||||
// printf("topic: %s\n", topicName);
|
||||
// printf("db: %s\n", dbName);
|
||||
// printf("vgroup id: %d\n", vgroupId);
|
||||
|
||||
printSubResults(pRes, &totalRows);
|
||||
} else {
|
||||
tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].currentOffset);
|
||||
tmq_offset_seek(tmq, "tp", pAssign[1].vgId, pAssign[1].currentOffset);
|
||||
continue;
|
||||
}
|
||||
|
||||
// tmq_commit_sync(tmq, pRes);
|
||||
if (pRes != NULL) {
|
||||
taos_free_result(pRes);
|
||||
// if ((++count) > 1) {
|
||||
// break;
|
||||
// }
|
||||
} else {
|
||||
break;
|
||||
}
|
||||
|
||||
// tmq_offset_seek(tmq, "tp", pAssign[0].vgId, pAssign[0].begin);
|
||||
}
|
||||
|
||||
tmq_free_assignment(pAssign);
|
||||
|
||||
code = tmq_get_topic_assignment(tmq, "tp", &pAssign, &numOfAssign);
|
||||
if (code != 0) {
|
||||
printf("error occurs:%s\n", tmq_err2str(code));
|
||||
tmq_free_assignment(pAssign);
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
return;
|
||||
}
|
||||
|
||||
for(int i = 0; i < numOfAssign; i++){
|
||||
printf("assign i:%d, vgId:%d, offset:%lld, start:%lld, end:%lld\n", i, pAssign[i].vgId, pAssign[i].currentOffset, pAssign[i].begin, pAssign[i].end);
|
||||
}
|
||||
|
||||
tmq_consumer_close(tmq);
|
||||
taos_close(pConn);
|
||||
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||
}
|
||||
|
||||
TEST(clientCase, sub_tb_test) {
|
||||
taos_options(TSDB_OPTION_CONFIGDIR, "~/first/cfg");
|
||||
|
||||
|
|
|
@ -490,7 +490,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
if (!exec) {
|
||||
tqSetHandleExec(pHandle);
|
||||
// qSetTaskCode(pHandle->execHandle.task, TDB_CODE_SUCCESS);
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle exec, pHandle:%p", consumerId, vgId,
|
||||
req.subKey, pHandle);
|
||||
taosWUnLockLatch(&pTq->lock);
|
||||
break;
|
||||
|
@ -518,7 +518,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
|
|||
code = tqExtractDataForMq(pTq, pHandle, &req, pMsg);
|
||||
tqSetHandleIdle(pHandle);
|
||||
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 "vgId:%d, topic:%s, , set handle idle, pHandle:%p", consumerId, vgId,
|
||||
tqDebug("tmq poll: consumer:0x%" PRIx64 " vgId:%d, topic:%s, set handle idle, pHandle:%p", consumerId, vgId,
|
||||
req.subKey, pHandle);
|
||||
return code;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue