fix(tmq): set the correct offset rsp when no poll occuring.
This commit is contained in:
parent
1e845acabe
commit
f61f37c936
|
@ -1100,7 +1100,7 @@ TEST(clientCase, sub_tb_test) {
|
||||||
|
|
||||||
// 创建订阅 topics 列表
|
// 创建订阅 topics 列表
|
||||||
tmq_list_t* topicList = tmq_list_new();
|
tmq_list_t* topicList = tmq_list_new();
|
||||||
tmq_list_append(topicList, "topic_t1");
|
tmq_list_append(topicList, "t1");
|
||||||
|
|
||||||
// 启动订阅
|
// 启动订阅
|
||||||
tmq_subscribe(tmq, topicList);
|
tmq_subscribe(tmq, topicList);
|
||||||
|
@ -1118,7 +1118,7 @@ TEST(clientCase, sub_tb_test) {
|
||||||
tmq_topic_assignment* pAssign = NULL;
|
tmq_topic_assignment* pAssign = NULL;
|
||||||
int32_t numOfAssign = 0;
|
int32_t numOfAssign = 0;
|
||||||
|
|
||||||
int32_t code = tmq_get_topic_assignment(tmq, "topic_t1", &pAssign, &numOfAssign);
|
int32_t code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign);
|
||||||
if (code != 0) {
|
if (code != 0) {
|
||||||
printf("error occurs:%s\n", tmq_err2str(code));
|
printf("error occurs:%s\n", tmq_err2str(code));
|
||||||
tmq_consumer_close(tmq);
|
tmq_consumer_close(tmq);
|
||||||
|
@ -1127,7 +1127,16 @@ TEST(clientCase, sub_tb_test) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
tmq_offset_seek(tmq, "topic_t1", pAssign[0].vgId, 0);
|
tmq_offset_seek(tmq, "t1", pAssign[0].vgId, 4);
|
||||||
|
|
||||||
|
code = tmq_get_topic_assignment(tmq, "t1", &pAssign, &numOfAssign);
|
||||||
|
if (code != 0) {
|
||||||
|
printf("error occurs:%s\n", tmq_err2str(code));
|
||||||
|
tmq_consumer_close(tmq);
|
||||||
|
taos_close(pConn);
|
||||||
|
fprintf(stderr, "%d msg consumed, include %d rows\n", msgCnt, totalRows);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
while (1) {
|
while (1) {
|
||||||
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
TAOS_RES* pRes = tmq_consumer_poll(tmq, timeout);
|
||||||
|
|
|
@ -510,8 +510,6 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
int64_t sver = 0, ever = 0;
|
int64_t sver = 0, ever = 0;
|
||||||
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
walReaderValidVersionRange(pHandle->execHandle.pTqReader->pWalReader, &sver, &ever);
|
||||||
|
|
||||||
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
|
|
||||||
|
|
||||||
SMqDataRsp dataRsp = {0};
|
SMqDataRsp dataRsp = {0};
|
||||||
tqInitDataRsp(&dataRsp, &req);
|
tqInitDataRsp(&dataRsp, &req);
|
||||||
|
|
||||||
|
@ -537,7 +535,12 @@ int32_t tqProcessVgWalInfoReq(STQ* pTq, SRpcMsg* pMsg) {
|
||||||
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
dataRsp.rspOffset.type = TMQ_OFFSET__LOG;
|
||||||
|
|
||||||
if (reqOffset.type == TMQ_OFFSET__LOG) {
|
if (reqOffset.type == TMQ_OFFSET__LOG) {
|
||||||
dataRsp.rspOffset.version = currentVer; // return current consume offset value
|
int64_t currentVer = walReaderGetCurrentVer(pHandle->execHandle.pTqReader->pWalReader);
|
||||||
|
if (currentVer == -1) { // not start to read data from wal yet, return req offset directly
|
||||||
|
dataRsp.rspOffset.version = reqOffset.version;
|
||||||
|
} else {
|
||||||
|
dataRsp.rspOffset.version = currentVer; // return current consume offset value
|
||||||
|
}
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_EARLIEAST) {
|
||||||
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
|
dataRsp.rspOffset.version = sver; // not consume yet, set the earliest position
|
||||||
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
} else if (reqOffset.type == TMQ_OFFSET__RESET_LATEST) {
|
||||||
|
|
Loading…
Reference in New Issue