fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL

This commit is contained in:
wangmm0220 2025-01-03 11:19:40 +08:00
parent 5cdbebbcd8
commit 63cf4c9cba
6 changed files with 48 additions and 33 deletions

View File

@ -509,7 +509,7 @@ For the OpenTSDB text protocol, the parsing of timestamps follows its official p
- **Interface Description**: Used for polling to consume data. Each consumer can only call this interface in a single thread. - **Interface Description**: Used for polling to consume data. Each consumer can only call this interface in a single thread.
- tmq: [Input] Points to a valid ws_tmq_t structure pointer, which represents a TMQ consumer object. - tmq: [Input] Points to a valid ws_tmq_t structure pointer, which represents a TMQ consumer object.
- timeout: [Input] Polling timeout in milliseconds, a negative number indicates a default timeout of 1 second. - timeout: [Input] Polling timeout in milliseconds, a negative number indicates a default timeout of 1 second.
- **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: Failure, indicates no data. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc. - **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: indicates no data, the error code can be obtained through ws_errno (NULL), please refer to the reference manual for specific error message. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc.
- `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)` - `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)`
- **Interface Description**: Used to close the ws_tmq_t structure. Must be used in conjunction with ws_tmq_consumer_new. - **Interface Description**: Used to close the ws_tmq_t structure. Must be used in conjunction with ws_tmq_consumer_new.

View File

@ -534,7 +534,7 @@ This document details the server error codes that may be encountered when using
| 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details | | 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details |
| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users | | 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users |
| 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed | | 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed |
| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs |
| 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data | | 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data |
| 0x80004018 | TMQ poll timeout | timeout is too small or there is no data to consume | Adjust the timeout parameter appropriately or check if the data has been consumed | | 0x80004018 | TMQ poll timeout | timeout is too small or there is no data to consume | Adjust the timeout parameter appropriately or check if the data has been consumed |
| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs |

View File

@ -509,7 +509,7 @@ TDengine 推荐数据库应用的每个线程都建立一个独立的连接,
- **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。 - **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。
- tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。 - tmq[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
- timeout[入参] 轮询的超时时间单位为毫秒负数表示默认超时1秒。 - timeout[入参] 轮询的超时时间单位为毫秒负数表示默认超时1秒。
- **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`失败,表示没有数据。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。 - **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`:表示没有数据, 可通过 ws_errno(NULL) 获取错误码,具体错误码参见参考手册。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。
- `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)` - `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)`
- **接口说明**:用于关闭 ws_tmq_t 结构体。需与 ws_tmq_consumer_new 配合使用。 - **接口说明**:用于关闭 ws_tmq_t 结构体。需与 ws_tmq_consumer_new 配合使用。

View File

@ -554,7 +554,7 @@ description: TDengine 服务端的错误码列表和详细说明
| 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 | | 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 |
| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 | | 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 |
| 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 | | 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 |
| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 |
| 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe直接poll数据 | | 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe直接poll数据 |
| 0x80004018 | TMQ poll timeout | 数据订阅超时,超时时间太短,或者数据消费完毕 | 可适当调大timeout 参数或者检测数据是否消费完毕 | | 0x80004018 | TMQ poll timeout | 数据订阅超时,超时时间太短,或者数据消费完毕 | 可适当调大timeout 参数或者检测数据是否消费完毕 |
| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 |

View File

@ -2339,9 +2339,6 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
SMqBatchMetaRsp batchMetaRsp; SMqBatchMetaRsp batchMetaRsp;
} MEMSIZE; } MEMSIZE;
if (pollRspWrapper == NULL) {
return NULL;
}
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
if (pRspObj == NULL) { if (pRspObj == NULL) {
tqErrorC("buildRsp:failed to allocate memory"); tqErrorC("buildRsp:failed to allocate memory");
@ -2383,15 +2380,17 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
} }
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_SUCCESS; return code;
} }
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
int32_t code = 0;
SMqRspObj* pRspObj = NULL; SMqRspObj* pRspObj = NULL;
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
SMqAskEpRsp* rspMsg = &pRspWrapper->epRsp; SMqAskEpRsp* rspMsg = &pRspWrapper->epRsp;
doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg); doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
terrno = code;
return pRspObj; return pRspObj;
} }
@ -2402,7 +2401,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
if(pVg == NULL) { if(pVg == NULL) {
tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId); pollRspWrapper->topicName, pollRspWrapper->vgId);
terrno = TSDB_CODE_TMQ_INVALID_VGID; code = TSDB_CODE_TMQ_INVALID_VGID;
goto END; goto END;
} }
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
@ -2461,6 +2460,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
} }
END: END:
terrno = code;
taosWUnLockLatch(&tmq->lock); taosWUnLockLatch(&tmq->lock);
return pRspObj; return pRspObj;
} }
@ -2468,65 +2468,72 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
static void* tmqHandleAllRsp(tmq_t* tmq) { static void* tmqHandleAllRsp(tmq_t* tmq) {
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
int32_t code = 0;
void* returnVal = NULL; void* returnVal = NULL;
while (1) { while (1) {
SMqRspWrapper* pRspWrapper = NULL; SMqRspWrapper* pRspWrapper = NULL;
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){ code = taosReadAllQitems(tmq->mqueue, tmq->qall);
return NULL; if (code == 0){
goto END;
} }
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { code = taosGetQitem(tmq->qall, (void**)&pRspWrapper);
return NULL; if (code == 0) {
goto END;
} }
} }
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
if (pRspWrapper->code != 0) { if (pRspWrapper->code != 0) {
terrno = processMqRspError(tmq, pRspWrapper); code = processMqRspError(tmq, pRspWrapper);
}else{ }else{
returnVal = processMqRsp(tmq, pRspWrapper); returnVal = processMqRsp(tmq, pRspWrapper);
code = terrno;
} }
tmqFreeRspWrapper(pRspWrapper); tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper); taosFreeQitem(pRspWrapper);
if(returnVal != NULL || terrno != 0){ if(returnVal != NULL || code != 0){
break; break;
} }
} }
END:
terrno = code;
return returnVal; return returnVal;
} }
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
int32_t lino = 0; int32_t lino = 0;
terrno = TSDB_CODE_SUCCESS; int32_t code = 0;
TSDB_CHECK_NULL(tmq, terrno, lino, END, TSDB_CODE_INVALID_PARA); TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
void* rspObj = NULL; void* rspObj = NULL;
int64_t startTime = taosGetTimestampMs(); int64_t startTime = taosGetTimestampMs();
tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, terrno, lino, END, TSDB_CODE_TMQ_INVALID_STATUS); TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
(void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1); (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
while (1) { while (1) {
terrno = tmqHandleAllDelayedTask(tmq); code = tmqHandleAllDelayedTask(tmq);
TSDB_CHECK_CODE(terrno, lino, END); TSDB_CHECK_CODE(code, lino, END);
terrno = tmqPollImpl(tmq, timeout); code = tmqPollImpl(tmq, timeout);
TSDB_CHECK_CODE(terrno, lino, END); TSDB_CHECK_CODE(code, lino, END);
rspObj = tmqHandleAllRsp(tmq); rspObj = tmqHandleAllRsp(tmq);
if (rspObj) { if (rspObj) {
tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj; return (TAOS_RES*)rspObj;
} }
TSDB_CHECK_CODE(terrno, lino, END); code = terrno;
TSDB_CHECK_CODE(code, lino, END);
if (timeout >= 0) { if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs(); int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime; int64_t elapsedTime = currentTime - startTime;
TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, terrno, lino, END, TSDB_CODE_TMQ_POLL_TIMEOUT); TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, TSDB_CODE_TMQ_POLL_TIMEOUT);
(void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else { } else {
(void)tsem2_timewait(&tmq->rspSem, 1000); (void)tsem2_timewait(&tmq->rspSem, 1000);
@ -2534,6 +2541,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
} }
END: END:
terrno = code;
if (tmq != NULL) { if (tmq != NULL) {
tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno)); tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
} }

View File

@ -294,7 +294,10 @@ void *taosMemCalloc(int64_t num, int64_t size) {
#ifdef USE_TD_MEMORY #ifdef USE_TD_MEMORY
int32_t memorySize = num * size; int32_t memorySize = num * size;
char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
if (tmp == NULL) return NULL; if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
pTdMemoryInfo->memorySize = memorySize; pTdMemoryInfo->memorySize = memorySize;
@ -328,6 +331,7 @@ void *taosMemRealloc(void *ptr, int64_t size) {
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL; return NULL;
} }
@ -335,7 +339,10 @@ void *taosMemRealloc(void *ptr, int64_t size) {
memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo)); memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo));
void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo)); void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo));
if (tmp == NULL) return NULL; if (tmp == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo)); memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo));
((TdMemoryInfoPtr)tmp)->memorySize = size; ((TdMemoryInfoPtr)tmp)->memorySize = size;