Merge pull request #29519 from taosdata/fix/TD-32471

fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL
This commit is contained in:
Shengliang Guan 2025-01-09 13:33:27 +08:00 committed by GitHub
commit 18e562bdc1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
11 changed files with 171 additions and 10 deletions

View File

@ -535,6 +535,5 @@ This document details the server error codes that may be encountered when using
| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users |
| 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed |
| 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data |
| 0x80004018 | TMQ poll timeout | timeout is too small or there is no data to consume | Adjust the timeout parameter appropriately or check if the data has been consumed |
| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs |

View File

@ -555,6 +555,5 @@ description: TDengine 服务端的错误码列表和详细说明
| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 |
| 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 |
| 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe直接poll数据 |
| 0x80004018 | TMQ poll timeout | 数据订阅超时,超时时间太短,或者数据消费完毕 | 可适当调大timeout 参数或者检测数据是否消费完毕 |
| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 |

View File

@ -1016,7 +1016,6 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
#define TSDB_CODE_TMQ_POLL_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x4018)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -64,6 +64,7 @@ enum {
enum {
TMQ_CONSUMER_STATUS__INIT = 0,
TMQ_CONSUMER_STATUS__READY,
TMQ_CONSUMER_STATUS__LOST,
TMQ_CONSUMER_STATUS__CLOSED,
};
@ -1318,6 +1319,9 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) {
if (code != TSDB_CODE_SUCCESS) {
if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){
tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code));
if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){
atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__LOST);
}
}
goto END;
}
@ -2253,8 +2257,13 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
return TSDB_CODE_INVALID_MSG;
}
int32_t code = 0;
taosWLockLatch(&tmq->lock);
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){
code = TSDB_CODE_TMQ_CONSUMER_MISMATCH;
goto end;
}
int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics);
tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics);
@ -2366,7 +2375,7 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
if (code != 0) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
}
} else if (code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
} else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
code = 0;
}
tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
@ -2533,7 +2542,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime;
TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, TSDB_CODE_TMQ_POLL_TIMEOUT);
TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, 0);
(void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else {
(void)tsem2_timewait(&tmq->rspSem, 1000);
@ -2578,7 +2587,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) {
tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status);
displayConsumeStatistics(tmq);
if (status != TMQ_CONSUMER_STATUS__READY) {
if (status != TMQ_CONSUMER_STATUS__READY && status != TMQ_CONSUMER_STATUS__LOST) {
tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status);
goto END;
}

View File

@ -263,7 +263,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) {
MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer));
MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user));
atomic_store_32(&pConsumer->hbStatus, 0);
mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d %d", consumerId, req.pollFlag, pConsumer->pollStatus);
mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus);
if (req.pollFlag == 1){
atomic_store_32(&pConsumer->pollStatus, 0);
}

View File

@ -859,7 +859,6 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_POLL_TIMEOUT, "TMQ poll timeout")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")

View File

@ -334,6 +334,7 @@
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32526.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32471.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py
,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py
,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py

View File

@ -0,0 +1,54 @@
import taos
import sys
import time
import socket
import os
import threading
from util.log import *
from util.sql import *
from util.cases import *
from util.dnodes import *
from util.common import *
from taos.tmq import *
sys.path.append("./7-tmq")
from tmqCommon import *
class TDTestCase:
updatecfgDict = {'debugFlag': 135, 'asynclog': 0}
def init(self, conn, logSql, replicaVar=1):
self.replicaVar = int(replicaVar)
tdLog.debug(f"start to excute {__file__}")
tdSql.init(conn.cursor())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def run(self):
tdSql.execute(f'create database if not exists db_32471')
tdSql.execute(f'use db_32471')
tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)')
tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)")
buildPath = tdCom.getBuildPath()
cmdStr = '%s/build/bin/tmq_td32471'%(buildPath)
# tdLog.info(cmdStr)
# os.system(cmdStr)
#
# tdSql.execute("drop topic db_32471_topic")
tdSql.execute(f'alter stable meters add column item_tags nchar(500)')
tdSql.execute(f'alter stable meters add column new_col nchar(100)')
tdSql.execute("create topic db_32471_topic as select * from db_32471.meters")
tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-06 14:38:05.000',10.30000,219,0.31000, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', '1')")
tdLog.info(cmdStr)
if os.system(cmdStr) != 0:
tdLog.exit(cmdStr)
return
def stop(self):
tdSql.close()
tdLog.success(f"{__file__} successfully executed")
tdCases.addLinux(__file__, TDTestCase())
tdCases.addWindows(__file__, TDTestCase())

View File

@ -6,6 +6,7 @@ add_executable(tmq_taosx_ci tmq_taosx_ci.c)
add_executable(tmq_ts5466 tmq_ts5466.c)
add_executable(tmq_td32526 tmq_td32526.c)
add_executable(tmq_td32187 tmq_td32187.c)
add_executable(tmq_td32471 tmq_td32471.c)
add_executable(tmq_write_raw_test tmq_write_raw_test.c)
add_executable(write_raw_block_test write_raw_block_test.c)
add_executable(sml_test sml_test.c)
@ -72,6 +73,13 @@ target_link_libraries(
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_td32471
PUBLIC ${TAOS_LIB}
PUBLIC util
PUBLIC common
PUBLIC os
)
target_link_libraries(
tmq_td32526
PUBLIC ${TAOS_LIB}

View File

@ -0,0 +1,93 @@
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <assert.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <time.h>
#include "cJSON.h"
#include "taos.h"
#include "tmsg.h"
#include "types.h"
void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) {
printf("commit %d tmq %p param %p\n", code, tmq, param);
}
tmq_t* build_consumer() {
tmq_conf_t* conf = tmq_conf_new();
tmq_conf_set(conf, "group.id", "g1");
tmq_conf_set(conf, "client.id", "my app 1");
tmq_conf_set(conf, "td.connect.user", "root");
tmq_conf_set(conf, "td.connect.pass", "taosdata");
tmq_conf_set(conf, "msg.with.table.name", "true");
tmq_conf_set(conf, "enable.auto.commit", "true");
tmq_conf_set(conf, "auto.offset.reset", "earliest");
tmq_conf_set(conf, "msg.consume.excluded", "1");
tmq_conf_set(conf, "max.poll.interval.ms", "2000");
tmq_conf_set(conf, "heartbeat.interval.ms", "100");
tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL);
tmq_t* tmq = tmq_consumer_new(conf, NULL, 0);
assert(tmq);
tmq_conf_destroy(conf);
return tmq;
}
tmq_list_t* build_topic_list() {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "db_32471_topic");
return topic_list;
}
void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
int32_t code;
if ((code = tmq_subscribe(tmq, topics))) {
fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code));
printf("subscribe err\n");
return;
}
int32_t cnt = 0;
while (1) {
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
if (tmqmessage) {
cnt++;
taos_free_result(tmqmessage);
} else {
ASSERT(taos_errno(NULL) == 0);
break;
}
}
taosSsleep(5);
TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000);
ASSERT(tmqmessage == NULL);
ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_CONSUMER_MISMATCH);
code = tmq_consumer_close(tmq);
if (code)
fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code));
else
fprintf(stderr, "%% Consumer closed\n");
}
int main(int argc, char* argv[]) {
tmq_t* tmq = build_consumer();
tmq_list_t* topic_list = build_topic_list();
basic_consume_loop(tmq, topic_list);
tmq_list_destroy(topic_list);
}

View File

@ -181,7 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
printResult(tmqmessage);
taos_free_result(tmqmessage);
} else {
ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_POLL_TIMEOUT);
ASSERT(taos_errno(NULL) == 0);
break;
}
}