diff --git a/docs/en/14-reference/05-connector/10-cpp.md b/docs/en/14-reference/05-connector/10-cpp.md index 76488442e2..24578b3220 100644 --- a/docs/en/14-reference/05-connector/10-cpp.md +++ b/docs/en/14-reference/05-connector/10-cpp.md @@ -509,7 +509,7 @@ For the OpenTSDB text protocol, the parsing of timestamps follows its official p - **Interface Description**: Used for polling to consume data. Each consumer can only call this interface in a single thread. - tmq: [Input] Points to a valid ws_tmq_t structure pointer, which represents a TMQ consumer object. - timeout: [Input] Polling timeout in milliseconds, a negative number indicates a default timeout of 1 second. - - **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: Failure, indicates no data. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc. + - **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: indicates no data, the error code can be obtained through ws_errno (NULL), please refer to the reference manual for specific error message. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc. - `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)` - **Interface Description**: Used to close the ws_tmq_t structure. Must be used in conjunction with ws_tmq_consumer_new. diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 9c0a0be377..6737fd1124 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -534,7 +534,7 @@ This document details the server error codes that may be encountered when using | 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details | | 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users | | 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed | -| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs | | 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data | | 0x80004018 | TMQ poll timeout | timeout is too small or there is no data to consume | Adjust the timeout parameter appropriately or check if the data has been consumed | +| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs | diff --git a/docs/zh/14-reference/05-connector/10-cpp.mdx b/docs/zh/14-reference/05-connector/10-cpp.mdx index 1bf3348a21..a23d770998 100644 --- a/docs/zh/14-reference/05-connector/10-cpp.mdx +++ b/docs/zh/14-reference/05-connector/10-cpp.mdx @@ -509,7 +509,7 @@ TDengine 推荐数据库应用的每个线程都建立一个独立的连接, - **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。 - tmq:[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。 - timeout:[入参] 轮询的超时时间,单位为毫秒,负数表示默认超时1秒。 - - **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`:失败,表示没有数据。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。 + - **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`:表示没有数据, 可通过 ws_errno(NULL) 获取错误码,具体错误码参见参考手册。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。 - `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)` - **接口说明**:用于关闭 ws_tmq_t 结构体。需与 ws_tmq_consumer_new 配合使用。 diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index 16f0b7ebf4..d2b48d509b 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -554,7 +554,7 @@ description: TDengine 服务端的错误码列表和详细说明 | 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 | | 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 | | 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 | -| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 | | 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe,直接poll数据 | | 0x80004018 | TMQ poll timeout | 数据订阅超时,超时时间太短,或者数据消费完毕 | 可适当调大timeout 参数或者检测数据是否消费完毕 | +| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 | diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 670398d1f5..90aa2d8c65 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1209,9 +1209,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic } static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){ - if (tmq == NULL || newTopics == NULL || pRsp == NULL) { - return; - } + if (tmq == NULL || newTopics == NULL || pRsp == NULL) { + return; + } SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); if (pVgOffsetHashMap == NULL) { tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno); @@ -1266,9 +1266,9 @@ static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* } static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { - if (tmq == NULL || pRsp == NULL) { - return; - } + if (tmq == NULL || pRsp == NULL) { + return; + } int32_t topicNumGet = taosArrayGetSize(pRsp->topics); // vnode transform (epoch == tmq->epoch && topicNumGet != 0) // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0) @@ -2339,9 +2339,6 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){ SMqBatchMetaRsp batchMetaRsp; } MEMSIZE; - if (pollRspWrapper == NULL) { - return NULL; - } SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); if (pRspObj == NULL) { tqErrorC("buildRsp:failed to allocate memory"); @@ -2383,15 +2380,17 @@ static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ } taosWUnLockLatch(&tmq->lock); - return TSDB_CODE_SUCCESS; + return code; } static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ + int32_t code = 0; SMqRspObj* pRspObj = NULL; if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); SMqAskEpRsp* rspMsg = &pRspWrapper->epRsp; doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg); + terrno = code; return pRspObj; } @@ -2402,7 +2401,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ if(pVg == NULL) { tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); - terrno = TSDB_CODE_TMQ_INVALID_VGID; + code = TSDB_CODE_TMQ_INVALID_VGID; goto END; } pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); @@ -2460,7 +2459,8 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META; } - END: +END: + terrno = code; taosWUnLockLatch(&tmq->lock); return pRspObj; } @@ -2468,65 +2468,72 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ static void* tmqHandleAllRsp(tmq_t* tmq) { tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); + int32_t code = 0; void* returnVal = NULL; while (1) { SMqRspWrapper* pRspWrapper = NULL; if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { - if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){ - return NULL; + code = taosReadAllQitems(tmq->mqueue, tmq->qall); + if (code == 0){ + goto END; } - if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { - return NULL; + code = taosGetQitem(tmq->qall, (void**)&pRspWrapper); + if (code == 0) { + goto END; } } tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); if (pRspWrapper->code != 0) { - terrno = processMqRspError(tmq, pRspWrapper); + code = processMqRspError(tmq, pRspWrapper); }else{ returnVal = processMqRsp(tmq, pRspWrapper); + code = terrno; } tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); - if(returnVal != NULL || terrno != 0){ + if(returnVal != NULL || code != 0){ break; } } +END: + terrno = code; return returnVal; } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { int32_t lino = 0; - terrno = TSDB_CODE_SUCCESS; - TSDB_CHECK_NULL(tmq, terrno, lino, END, TSDB_CODE_INVALID_PARA); + int32_t code = 0; + TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA); void* rspObj = NULL; int64_t startTime = taosGetTimestampMs(); tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); - TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, terrno, lino, END, TSDB_CODE_TMQ_INVALID_STATUS); + TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS); (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1); while (1) { - terrno = tmqHandleAllDelayedTask(tmq); - TSDB_CHECK_CODE(terrno, lino, END); + code = tmqHandleAllDelayedTask(tmq); + TSDB_CHECK_CODE(code, lino, END); - terrno = tmqPollImpl(tmq, timeout); - TSDB_CHECK_CODE(terrno, lino, END); + code = tmqPollImpl(tmq, timeout); + TSDB_CHECK_CODE(code, lino, END); rspObj = tmqHandleAllRsp(tmq); if (rspObj) { tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } - TSDB_CHECK_CODE(terrno, lino, END); + code = terrno; + TSDB_CHECK_CODE(code, lino, END); if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; - TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, terrno, lino, END, TSDB_CODE_TMQ_POLL_TIMEOUT); + TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, TSDB_CODE_TMQ_POLL_TIMEOUT); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { (void)tsem2_timewait(&tmq->rspSem, 1000); @@ -2534,6 +2541,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { } END: + terrno = code; if (tmq != NULL) { tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno)); } diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index fdbf4853ad..51454653ca 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -294,7 +294,10 @@ void *taosMemCalloc(int64_t num, int64_t size) { #ifdef USE_TD_MEMORY int32_t memorySize = num * size; char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); - if (tmp == NULL) return NULL; + if (tmp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; pTdMemoryInfo->memorySize = memorySize; @@ -328,6 +331,7 @@ void *taosMemRealloc(void *ptr, int64_t size) { TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -335,7 +339,10 @@ void *taosMemRealloc(void *ptr, int64_t size) { memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo)); void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo)); - if (tmp == NULL) return NULL; + if (tmp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo)); ((TdMemoryInfoPtr)tmp)->memorySize = size;