fix: remove waitting for empty block & optimize poll logic

This commit is contained in:
wangmm0220 2025-03-04 17:57:33 +08:00
parent 100c65ced5
commit 08601c5dc9
6 changed files with 179 additions and 64 deletions

View File

@ -537,8 +537,10 @@ This document details the server error codes that may be encountered when using
| Error Code | Description | Possible Error Scenarios or Reasons | Recommended Actions for Users |
| ---------- | --------------------- | ------------------------------------------------------------ | -------------------------------------------- |
| 0x800003E6 | Consumer not exist | Consumer timeout offline | rebuild consumer to subscribe data again |
| 0x800003EA | Consumer not ready | Consumer rebalancing | retry after 2s |
| 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details |
| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users |
| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error |
| 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed |
| 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data |
| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs |

View File

@ -557,8 +557,10 @@ description: TDengine 服务端的错误码列表和详细说明
| 错误码 | 错误描述 | 可能的出错场景或者可能的原因 | 建议用户采取的措施 |
| ---------- | --------------------- | -------------------------------------------------------------------------------- | ------------------------------ |
| 0x800003E6 | Consumer not exist | Consumer 超时下线 | 重新建consumer订阅数据 |
| 0x800003EA | Consumer not ready | Consumer 正在平衡中 | 等待2秒后重试 |
| 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 |
| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 |
| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致一般存在于有新消费者加入相同消费者组里时 | 内部错误 |
| 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 |
| 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe直接 poll 数据 |
| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 |

View File

@ -148,7 +148,6 @@ struct tmq_t {
STscObj* pTscObj; // connection
SArray* clientTopics; // SArray<SMqClientTopic>
STaosQueue* mqueue; // queue of rsp
STaosQall* qall;
STaosQueue* delayedTask; // delayed task queue for heartbeat and auto commit
tsem2_t rspSem;
};
@ -946,7 +945,6 @@ static void generateTimedTask(int64_t refId, int32_t type) {
if (code == TSDB_CODE_SUCCESS) {
*pTaskType = type;
if (taosWriteQitem(tmq->delayedTask, pTaskType) == 0) {
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 2", tmq->consumerId);
if (tsem2_post(&tmq->rspSem) != 0){
tqErrorC("consumer:0x%" PRIx64 " failed to post sem, type:%d", tmq->consumerId, type);
}
@ -1133,7 +1131,7 @@ void tmqSendHbReq(void* param, void* tmrId) {
tDestroySMqHbReq(&req);
if (tmrId != NULL) {
bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer);
tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat:%d, pollFlag:%d", tmq->consumerId, ret, tmq->pollFlag);
tqDebugC("consumer:0x%" PRIx64 " reset timer for tmq heartbeat ret:%d, interval:%d, pollFlag:%d", tmq->consumerId, ret, tmq->heartBeatIntervalMs, tmq->pollFlag);
}
int32_t ret = taosReleaseRef(tmqMgmt.rsetId, refId);
if (ret != 0){
@ -1485,27 +1483,14 @@ END:
}
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = NULL;
int32_t code = 0;
code = taosAllocateQall(&qall);
if (code) {
tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
return code;
}
int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
if (numOfItems == 0) {
taosFreeQall(qall);
return 0;
}
tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
int8_t* pTaskType = NULL;
while (taosGetQitem(qall, (void**)&pTaskType) != 0) {
tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, taosQueueItemSize(pTmq->delayedTask));
while (1) {
int8_t* pTaskType = NULL;
taosReadQitem(pTmq->delayedTask, (void**)&pTaskType);
if (pTaskType == NULL) {break;}
if (*pTaskType == TMQ_DELAYED_TASK__ASK_EP) {
tqDebugC("consumer:0x%" PRIx64 " retrieve ask ep timer", pTmq->consumerId);
code = askEp(pTmq, NULL, false, false);
int32_t code = askEp(pTmq, NULL, false, false);
if (code != 0) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
}
@ -1528,23 +1513,15 @@ static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
taosFreeQitem(pTaskType);
}
taosFreeQall(qall);
return 0;
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
if (tmq == NULL) return;
SMqRspWrapper* rspWrapper = NULL;
while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(rspWrapper);
}
rspWrapper = NULL;
if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
return;
}
while (taosGetQitem(tmq->qall, (void**)&rspWrapper) != 0) {
while (1) {
SMqRspWrapper* rspWrapper = NULL;
taosReadQitem(tmq->mqueue, (void**)&rspWrapper);
if (rspWrapper == NULL) break;
tmqFreeRspWrapper(rspWrapper);
taosFreeQitem(rspWrapper);
}
@ -1611,7 +1588,6 @@ void tmqFreeImpl(void* handle) {
taosCloseQueue(tmq->delayedTask);
}
taosFreeQall(tmq->qall);
if(tsem2_destroy(&tmq->rspSem) != 0) {
tqErrorC("failed to destroy sem in free tmq");
}
@ -1737,14 +1713,6 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
goto _failed;
}
code = taosAllocateQall(&pTmq->qall);
if (code) {
tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
pTmq->groupId);
SET_ERROR_MSG_TMQ("allocate qall failed")
goto _failed;
}
if (conf->groupId[0] == 0) {
tqErrorC("consumer:0x%" PRIx64 " setup failed since %s, groupId:%s", pTmq->consumerId, tstrerror(code),
pTmq->groupId);
@ -2126,7 +2094,6 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
}
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp here 1", tmq->consumerId);
if (tsem2_post(&tmq->rspSem) != 0){
tqErrorC("failed to post rsp sem, consumer:0x%" PRIx64, tmq->consumerId);
}
@ -2320,7 +2287,7 @@ static int32_t tmqPollImpl(tmq_t* tmq) {
taosWLockLatch(&tmq->lock);
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
code = TSDB_CODE_MND_CONSUMER_NOT_EXIST;
goto end;
}
@ -2421,12 +2388,12 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
code = askEp(tmq, NULL, false, true);
if (code != 0) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep wher vnode transform, code:%s", tmq->consumerId, tstrerror(code));
}
} else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
code = askEp(tmq, NULL, false, false);
code = syncAskEp(tmq);
if (code != 0) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep when consumer mismatch, code:%s", tmq->consumerId, tstrerror(code));
}
} else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
code = 0;
@ -2563,22 +2530,14 @@ END:
}
static void* tmqHandleAllRsp(tmq_t* tmq) {
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQueueItemSize(tmq->mqueue));
int32_t code = 0;
void* returnVal = NULL;
while (1) {
SMqRspWrapper* pRspWrapper = NULL;
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
code = taosReadAllQitems(tmq->mqueue, tmq->qall);
if (code == 0){
goto END;
}
code = taosGetQitem(tmq->qall, (void**)&pRspWrapper);
if (code == 0) {
goto END;
}
}
taosReadQitem(tmq->mqueue, (void**)&pRspWrapper);
if (pRspWrapper == NULL) {break;}
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
if (pRspWrapper->code != 0) {
@ -2617,9 +2576,6 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
code = tmqHandleAllDelayedTask(tmq);
TSDB_CHECK_CODE(code, lino, END);
code = tmqPollImpl(tmq);
TSDB_CHECK_CODE(code, lino, END);
rspObj = tmqHandleAllRsp(tmq);
if (rspObj) {
tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
@ -2628,6 +2584,9 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
code = terrno;
TSDB_CHECK_CODE(code, lino, END);
code = tmqPollImpl(tmq);
TSDB_CHECK_CODE(code, lino, END);
if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime;

View File

@ -37,6 +37,10 @@ class TDTestCase:
tdLog.info(cmdStr)
os.system(cmdStr)
cmdStr = '%s/build/bin/tmq_poll_test'%(buildPath)
tdLog.info(cmdStr)
os.system(cmdStr)
return
def stop(self):

View File

@ -9,6 +9,7 @@ add_executable(tmq_td32187 tmq_td32187.c)
add_executable(tmq_ts5776 tmq_ts5776.c)
add_executable(tmq_td32471 tmq_td32471.c)
add_executable(tmq_td33798 tmq_td33798.c)
add_executable(tmq_poll_test tmq_poll_test.c)
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c)
@ -89,6 +90,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_poll_test
PUBLIC ${TAOS_LIB}
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_td32526
PUBLIC ${TAOS_LIB}

View File

@ -0,0 +1,140 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "cJSON.h"
#include "taos.h"
#include "tmsg.h"
#include "types.h"
TAOS_RES* pRes = NULL;
TAOS* pConn = NULL;
TAOS_RES* tmqmessage = NULL;
#define EXEC_SQL(sql) \
pRes = taos_query(pConn,sql);\
ASSERT(taos_errno(pRes) == 0);\
taos_free_result(pRes)
void init_env() {
EXEC_SQL("drop topic if exists topic_db");
EXEC_SQL("drop database if exists db_src");
EXEC_SQL("create database if not exists db_src vgroups 1 wal_retention_period 3600");
EXEC_SQL("use db_src");
EXEC_SQL("create stable if not exists st1 (ts timestamp, c1 int, c2 float, c3 binary(16)) tags(t1 int, t3 nchar(8), t4 bool)");
EXEC_SQL("insert into ct3 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')");
EXEC_SQL("create topic topic_db as database db_src");
}
tmq_t* build_consumer(bool testLongHeartBeat) {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "tg2");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
if (testLongHeartBeat){
ASSERT(tmq_conf_set(conf, "session.timeout.ms", "8000") == TMQ_CONF_OK);
ASSERT(tmq_conf_set(conf, "heartbeat.interval.ms", "100000") == TMQ_CONF_OK);
}
tmq_conf_set_auto_commit_cb(conf, NULL, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
ASSERT(tmq != NULL);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "topic_db");
return topic_list;
}
void test_poll_continuity(tmq_t* tmq, tmq_list_t* topics) {
ASSERT ((tmq_subscribe(tmq, topics)) == 0);
tmqmessage = tmq_consumer_poll(tmq, 500);
ASSERT (tmqmessage != NULL);
taos_free_result(tmqmessage);
ASSERT (tmq_unsubscribe(tmq) == 0);
printf("unsubscribe success\n");
ASSERT (tmq_subscribe(tmq, topics) == 0);
printf("subscribe success\n");
tmqmessage = tmq_consumer_poll(tmq, 500);
ASSERT (tmqmessage == NULL);
taos_free_result(tmqmessage);
EXEC_SQL("insert into ct1 using st1(t1) tags(3000) values(1626006833600, 5, 6, 'c')");
printf("insert into ct1\n");
tmqmessage = tmq_consumer_poll(tmq, 500);
ASSERT (tmqmessage != NULL);
taos_free_result(tmqmessage);
}
void test_consumer_offline(tmq_t* tmq, tmq_list_t* topics) {
ASSERT ((tmq_subscribe(tmq, topics)) == 0);
tmqmessage = tmq_consumer_poll(tmq, 500);
ASSERT (tmqmessage != NULL);
taos_free_result(tmqmessage);
taosSsleep(15);
tmqmessage = tmq_consumer_poll(tmq, 500);
ASSERT (tmqmessage == NULL);
ASSERT (taos_errno(NULL) == TSDB_CODE_MND_CONSUMER_NOT_EXIST);
taos_free_result(tmqmessage);
}
int main(int argc, char* argv[]) {
pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
ASSERT (pConn != NULL);
printf("test poll continuity\n");
for (int i = 0; i < 10; i++){
printf("-------run times:%d start---------\n", i);
init_env();
tmq_t* tmq = build_consumer(false);
tmq_list_t* topic_list = build_topic_list();
test_poll_continuity(tmq, topic_list);
ASSERT(tmq_consumer_close(tmq) == 0);
tmq_list_destroy(topic_list);
printf("-------run times:%d end---------\n\n", i);
}
// printf("\n\n\ntest consumer offline\n");
// init_env();
// tmq_t* tmq = build_consumer(true);
// tmq_list_t* topic_list = build_topic_list();
// test_consumer_offline(tmq, topic_list);
// ASSERT(tmq_consumer_close(tmq) == 0);
// tmq_list_destroy(topic_list);
taos_close(pConn);
}