From ba9892bae0d3cb9f80c7bd1e9b31570b7a931754 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Wed, 8 Jan 2025 16:50:38 +0800 Subject: [PATCH] fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL --- docs/en/14-reference/05-connector/10-cpp.md | 8 +- docs/en/14-reference/09-error-code.md | 1 + docs/zh/14-reference/05-connector/10-cpp.mdx | 4 +- docs/zh/14-reference/09-error-code.md | 1 + include/common/tcommon.h | 4 + include/util/taoserror.h | 2 + source/client/src/clientRawBlockWrite.c | 1 + source/client/src/clientTmq.c | 276 +++++++++---------- source/dnode/mnode/impl/src/mndConsumer.c | 2 +- source/dnode/vnode/src/inc/tq.h | 4 +- source/dnode/vnode/src/tq/tqScan.c | 46 ++-- source/dnode/vnode/src/tq/tqUtil.c | 20 +- source/os/src/osMemory.c | 11 +- source/util/src/terror.c | 2 + tests/parallel_test/cases.task | 1 + tests/system-test/7-tmq/tmq_td32471.py | 54 ++++ utils/test/c/CMakeLists.txt | 8 + utils/test/c/tmq_td32471.c | 93 +++++++ utils/test/c/tmq_td32526.c | 1 + 19 files changed, 350 insertions(+), 189 deletions(-) create mode 100644 tests/system-test/7-tmq/tmq_td32471.py create mode 100644 utils/test/c/tmq_td32471.c diff --git a/docs/en/14-reference/05-connector/10-cpp.md b/docs/en/14-reference/05-connector/10-cpp.md index 940d4c359e..3b51b47461 100644 --- a/docs/en/14-reference/05-connector/10-cpp.md +++ b/docs/en/14-reference/05-connector/10-cpp.md @@ -509,8 +509,8 @@ For the OpenTSDB text protocol, the parsing of timestamps follows its official p - **Interface Description**: Used for polling to consume data. Each consumer can only call this interface in a single thread. - tmq: [Input] Points to a valid ws_tmq_t structure pointer, which represents a TMQ consumer object. - timeout: [Input] Polling timeout in milliseconds, a negative number indicates a default timeout of 1 second. - - **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: Failure, indicates no data. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc. - + - **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: indicates no data, the error code can be obtained through ws_errno (NULL), please refer to the reference manual for specific error message. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc. + - `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)` - **Interface Description**: Used to close the ws_tmq_t structure. Must be used in conjunction with ws_tmq_consumer_new. - tmq: [Input] Points to a valid ws_tmq_t structure pointer, which represents a TMQ consumer object. @@ -1195,8 +1195,8 @@ In addition to using SQL or parameter binding APIs to insert data, you can also - **Interface Description**: Used to poll for consuming data, each consumer can only call this interface in a single thread. - tmq: [Input] Points to a valid tmq_t structure pointer, representing a TMQ consumer object. - timeout: [Input] Polling timeout in milliseconds, a negative number indicates a default timeout of 1 second. - - **Return Value**: Non-`NULL`: Success, returns a pointer to a TAOS_RES structure containing the received messages. `NULL`: Failure, indicates no data. TAOS_RES results are consistent with taos_query results, and information in TAOS_RES can be obtained through various query interfaces, such as schema, etc. - + - **Return Value**: Non-`NULL`: Success, returns a pointer to a TAOS_RES structure containing the received messages. `NULL`: indicates no data, the error code can be obtained through taos_errno (NULL), please refer to the reference manual for specific error message. TAOS_RES results are consistent with taos_query results, and information in TAOS_RES can be obtained through various query interfaces, such as schema, etc. + - `int32_t tmq_consumer_close(tmq_t *tmq)` - **Interface Description**: Used to close a tmq_t structure. Must be used in conjunction with tmq_consumer_new. - tmq: [Input] Points to a valid tmq_t structure pointer, which represents a TMQ consumer object. diff --git a/docs/en/14-reference/09-error-code.md b/docs/en/14-reference/09-error-code.md index 2bbd8f9305..190c626196 100644 --- a/docs/en/14-reference/09-error-code.md +++ b/docs/en/14-reference/09-error-code.md @@ -534,4 +534,5 @@ This document details the server error codes that may be encountered when using | 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details | | 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users | | 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed | +| 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data | | 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs | diff --git a/docs/zh/14-reference/05-connector/10-cpp.mdx b/docs/zh/14-reference/05-connector/10-cpp.mdx index f6557bb388..d18ddb1b3e 100644 --- a/docs/zh/14-reference/05-connector/10-cpp.mdx +++ b/docs/zh/14-reference/05-connector/10-cpp.mdx @@ -509,7 +509,7 @@ TDengine 推荐数据库应用的每个线程都建立一个独立的连接, - **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。 - tmq:[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。 - timeout:[入参] 轮询的超时时间,单位为毫秒,负数表示默认超时1秒。 - - **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`:失败,表示没有数据。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。 + - **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`:表示没有数据, 可通过 ws_errno(NULL) 获取错误码,具体错误码参见参考手册。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。 - `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)` - **接口说明**:用于关闭 ws_tmq_t 结构体。需与 ws_tmq_consumer_new 配合使用。 @@ -1189,7 +1189,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多 - `int32_t tmq_consumer_close(tmq_t *tmq)` - **接口说明**:用于关闭 tmq_t 结构体。需与 tmq_consumer_new 配合使用。 - tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。 - - **返回值**:`0`:成功。非 `0`:失败,可调用函数 `char *tmq_err2str(int32_t code)` 获取更详细的错误信息。 + - **返回值**:非 `NULL`:成功,返回一个指向 TAOS_RES 结构体的指针,该结构体包含了接收到的消息。。`NULL`:表示没有数据,可通过taos_errno(NULL) 获取错误码,具体错误码参见参考手册。TAOS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 TAOS_RES 里的信息,比如 schema 等。 - `int32_t tmq_get_topic_assignment(tmq_t *tmq, const char *pTopicName, tmq_topic_assignment **assignment, int32_t *numOfAssignment)` - **接口说明**:返回当前 consumer 分配的 vgroup 的信息,每个 vgroup 的信息包括 vgId,wal 的最大最小 offset,以及当前消费到的 offset。 diff --git a/docs/zh/14-reference/09-error-code.md b/docs/zh/14-reference/09-error-code.md index 51453cef4c..7f4d36b4b2 100644 --- a/docs/zh/14-reference/09-error-code.md +++ b/docs/zh/14-reference/09-error-code.md @@ -554,5 +554,6 @@ description: TDengine 服务端的错误码列表和详细说明 | 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 | | 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 | | 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 | +| 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe,直接poll数据 | | 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 | diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 61cd482c70..0450766535 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -123,6 +123,10 @@ enum { TMQ_MSG_TYPE__POLL_BATCH_META_RSP, }; +static char* tmqMsgTypeStr[] = { + "data", "meta", "ask ep", "meta data", "wal info", "batch meta" +}; + enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e317fdd65a..a24b3ca7cf 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -47,6 +47,7 @@ const char* terrstr(); char* taosGetErrMsgReturn(); char* taosGetErrMsg(); +void taosClearErrMsg(); int32_t* taosGetErrno(); int32_t* taosGetErrln(); int32_t taosGetErrSize(); @@ -1013,6 +1014,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) #define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) +#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 613645c4cd..c200d38a56 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2572,6 +2572,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw); return TSDB_CODE_INVALID_PARA; } + taosClearErrMsg(); // clear global error message return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type); } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0cbdfc13e0..f49adb83f3 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -64,6 +64,7 @@ enum { enum { TMQ_CONSUMER_STATUS__INIT = 0, TMQ_CONSUMER_STATUS__READY, + TMQ_CONSUMER_STATUS__LOST, TMQ_CONSUMER_STATUS__CLOSED, }; @@ -768,7 +769,7 @@ static int32_t innerCommit(tmq_t* tmq, char* pTopicName, STqOffsetVal* offsetVal } tqDebugC("consumer:0x%" PRIx64 " topic:%s on vgId:%d send commit msg success, send offset:%s committed:%s", - tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); + tmq->consumerId, pTopicName, pVg->vgId, offsetBuf, commitBuf); tOffsetCopy(&pVg->offsetInfo.committedOffset, offsetVal); return code; } @@ -823,7 +824,7 @@ static void asyncCommitFromResult(tmq_t* tmq, const TAOS_RES* pRes, tmq_commit_c code = asyncCommitOffset(tmq, pTopicName, vgId, &offsetVal, pCommitFp, userParam); -end: + end: if (code != TSDB_CODE_SUCCESS && pCommitFp != NULL) { if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; pCommitFp(tmq, code, userParam); @@ -863,7 +864,7 @@ static int32_t innerCommitAll(tmq_t* tmq, SMqCommitCbParamSet* pParamSet){ } tqDebugC("consumer:0x%" PRIx64 " total commit:%d for %d topics", tmq->consumerId, pParamSet->waitingRspNum - DEFAULT_COMMIT_CNT, numOfTopics); -END: + END: taosRUnLockLatch(&tmq->lock); return code; } @@ -988,7 +989,7 @@ int32_t tmqHbCb(void* param, SDataBuf* pMsg, int32_t code) { tDestroySMqHbRsp(&rsp); -END: + END: taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); return code; @@ -1088,7 +1089,7 @@ void tmqSendHbReq(void* param, void* tmrId) { } (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 1, 0); -END: + END: tDestroySMqHbReq(&req); if (tmrId != NULL) { bool ret = taosTmrReset(tmqSendHbReq, tmq->heartBeatIntervalMs, param, tmqMgmt.timer, &tmq->hbLiveTimer); @@ -1209,9 +1210,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic } static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){ - if (tmq == NULL || newTopics == NULL || pRsp == NULL) { - return; - } + if (tmq == NULL || newTopics == NULL || pRsp == NULL) { + return; + } SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK); if (pVgOffsetHashMap == NULL) { tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno); @@ -1266,9 +1267,9 @@ static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* } static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) { - if (tmq == NULL || pRsp == NULL) { - return; - } + if (tmq == NULL || pRsp == NULL) { + return; + } int32_t topicNumGet = taosArrayGetSize(pRsp->topics); // vnode transform (epoch == tmq->epoch && topicNumGet != 0) // ask ep rsp (epoch == tmq->epoch && topicNumGet == 0) @@ -1318,6 +1319,9 @@ static int32_t askEpCb(void* param, SDataBuf* pMsg, int32_t code) { if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_MND_CONSUMER_NOT_READY){ tqErrorC("consumer:0x%" PRIx64 ", get topic endpoint error, code:%s", tmq->consumerId, tstrerror(code)); + if (code == TSDB_CODE_MND_CONSUMER_NOT_EXIST){ + atomic_store_8(&tmq->status, TMQ_CONSUMER_STATUS__LOST); + } } goto END; } @@ -1388,49 +1392,32 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { if (pTmq == NULL) { return TSDB_CODE_INVALID_PARA; } + int32_t code = 0; + int32_t lino = 0; SMqAskEpReq req = {0}; req.consumerId = pTmq->consumerId; req.epoch = updateEpSet ? -1 : pTmq->epoch; tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN); - int code = 0; SMqAskEpCbParam* pParam = NULL; void* pReq = NULL; int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); - if (tlen < 0) { - tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); - return TSDB_CODE_INVALID_PARA; - } - + TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA); pReq = taosMemoryCalloc(1, tlen); - if (pReq == NULL) { - tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); - return terrno; - } + TSDB_CHECK_NULL(pReq, code, lino, END, terrno); - if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { - tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); - taosMemoryFree(pReq); - return TSDB_CODE_INVALID_PARA; - } + code = tSerializeSMqAskEpReq(pReq, tlen, &req); + TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA); pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); - if (pParam == NULL) { - tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); - taosMemoryFree(pReq); - return terrno; - } + TSDB_CHECK_NULL(pParam, code, lino, END, terrno); pParam->refId = pTmq->refId; pParam->sync = sync; pParam->pParam = param; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) { - taosMemoryFree(pReq); - taosMemoryFree(pParam); - return terrno; - } + TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno); sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; sendInfo->requestId = generateRequestId(); @@ -1440,28 +1427,36 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { sendInfo->fp = askEpCb; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; + pReq = NULL; + pParam = NULL; + SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); - return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); + + END: + if (code != 0) { + tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code)); + } + taosMemoryFree(pReq); + taosMemoryFree(pParam); + return code; } -void tmqHandleAllDelayedTask(tmq_t* pTmq) { - if (pTmq == NULL) { - return; - } +static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = NULL; int32_t code = 0; code = taosAllocateQall(&qall); if (code) { tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code)); - return; + return code; } int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall); if (numOfItems == 0) { taosFreeQall(qall); - return; + return 0; } tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); @@ -1472,7 +1467,6 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { code = askEp(pTmq, NULL, false, false); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); - continue; } tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, @@ -1494,6 +1488,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { } taosFreeQall(qall); + return 0; } void tmqClearUnhandleMsg(tmq_t* tmq) { @@ -1562,7 +1557,7 @@ int32_t tmq_subscription(tmq_t* tmq, tmq_list_t** topics) { } void tmqFreeImpl(void* handle) { - if (handle == NULL) return; + if (handle == NULL) return; tmq_t* tmq = (tmq_t*)handle; int64_t id = tmq->consumerId; @@ -1759,13 +1754,13 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) { STqOffsetVal offset = {.type = pTmq->resetOffsetCfg}; tFormatOffset(buf, tListLen(buf), &offset); tqInfoC("consumer:0x%" PRIx64 " is setup, refId:%" PRId64 - ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s", + ", groupId:%s, snapshot:%d, autoCommit:%d, commitInterval:%dms, offset:%s", pTmq->consumerId, pTmq->refId, pTmq->groupId, pTmq->useSnapshot, pTmq->autoCommit, pTmq->autoCommitInterval, buf); return pTmq; -_failed: + _failed: tmqFreeImpl(pTmq); return NULL; } @@ -1942,7 +1937,7 @@ int32_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) { } } -END: + END: taosArrayDestroyP(req.topicNames, NULL); return code; } @@ -2032,7 +2027,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { if (msgEpoch != clientEpoch) { tqErrorC("consumer:0x%" PRIx64 - " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, + " msg discard from vgId:%d since epoch not equal, rsp epoch %d, current epoch %d, reqId:0x%" PRIx64, tmq->consumerId, vgId, msgEpoch, clientEpoch, requestId); code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; goto END; @@ -2057,7 +2052,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { pRspWrapper->pollRsp.pEpset = pMsg->pEpSet; pMsg->pEpSet = NULL; -END: + END: if (pRspWrapper) { pRspWrapper->code = code; pRspWrapper->pollRsp.vgId = vgId; @@ -2082,7 +2077,7 @@ END: tqErrorC("failed to release ref:%"PRId64 ", code:%d", refId, ret); } -EXIT: + EXIT: taosMemoryFreeClear(pMsg->pData); taosMemoryFreeClear(pMsg->pEpSet); return code; @@ -2095,7 +2090,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName); pReq->withTbName = tmq->withTbName; pReq->consumerId = tmq->consumerId; - pReq->timeout = timeout; + pReq->timeout = timeout < 0 ? INT32_MAX : timeout; pReq->epoch = tmq->epoch; pReq->reqOffset = pVg->offsetInfo.endOffset; pReq->head.vgId = pVg->vgId; @@ -2199,39 +2194,24 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg } static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { - if (pTmq == NULL || pTopic == NULL || pVg == NULL) { - return TSDB_CODE_INVALID_MSG; - } SMqPollReq req = {0}; char* msg = NULL; SMqPollCbParam* pParam = NULL; SMsgSendInfo* sendInfo = NULL; int code = 0; + int lino = 0; tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); - if (msgSize < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } + TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG); msg = taosMemoryCalloc(1, msgSize); - if (NULL == msg) { - return terrno; - } + TSDB_CHECK_NULL(msg, code, lino, END, terrno); - if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { - code = TSDB_CODE_INVALID_MSG; - taosMemoryFreeClear(msg); - return code; - } + TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG); pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); - if (pParam == NULL) { - code = terrno; - taosMemoryFreeClear(msg); - return code; - } + TSDB_CHECK_NULL(pParam, code, lino, END, terrno); pParam->refId = pTmq->refId; tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN); @@ -2239,11 +2219,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam->requestId = req.reqId; sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) { - taosMemoryFreeClear(pParam); - taosMemoryFreeClear(msg); - return terrno; - } + TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno); sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; sendInfo->requestId = req.reqId; @@ -2253,30 +2229,41 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo->fp = tmqPollCb; sendInfo->msgType = TDMT_VND_TMQ_CONSUME; + msg = NULL; + pParam = NULL; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo); tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); - if (code != 0) { - return code; - } + TSDB_CHECK_CODE(code, lino, END); pVg->pollCnt++; pVg->seekUpdated = false; // reset this flag. pTmq->pollCnt++; - return 0; + END: + if (code != 0){ + tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code)); + } + taosMemoryFreeClear(pParam); + taosMemoryFreeClear(msg); + return code; } -// broadcast the poll request to all related vnodes static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { if (tmq == NULL) { return TSDB_CODE_INVALID_MSG; } int32_t code = 0; - taosWLockLatch(&tmq->lock); + + if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__LOST){ + code = TSDB_CODE_TMQ_CONSUMER_MISMATCH; + goto end; + } + int32_t numOfTopics = taosArrayGetSize(tmq->clientTopics); tqDebugC("consumer:0x%" PRIx64 " start to poll data, numOfTopics:%d", tmq->consumerId, numOfTopics); @@ -2325,7 +2312,7 @@ static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { } } -end: + end: taosWUnLockLatch(&tmq->lock); tqDebugC("consumer:0x%" PRIx64 " end to poll data, code:%d", tmq->consumerId, code); return code; @@ -2361,9 +2348,6 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){ SMqBatchMetaRsp batchMetaRsp; } MEMSIZE; - if (pollRspWrapper == NULL) { - return NULL; - } SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj)); if (pRspObj == NULL) { tqErrorC("buildRsp:failed to allocate memory"); @@ -2377,22 +2361,22 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){ return pRspObj; } -static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ - if (tmq == NULL || pRspWrapper == NULL) { - return; - } +static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ + int32_t code = 0; SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp; if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform - int32_t code = askEp(tmq, NULL, false, true); + code = askEp(tmq, NULL, false, true); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); } } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { - int32_t code = askEp(tmq, NULL, false, false); + code = askEp(tmq, NULL, false, false); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); } + } else if (pRspWrapper->code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){ + code = 0; } tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); @@ -2404,17 +2388,18 @@ static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } taosWUnLockLatch(&tmq->lock); + + return code; } static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ - if (tmq == NULL || pRspWrapper == NULL) { - return NULL; - } + int32_t code = 0; SMqRspObj* pRspObj = NULL; if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId); SMqAskEpRsp* rspMsg = &pRspWrapper->epRsp; doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg); + terrno = code; return pRspObj; } @@ -2425,6 +2410,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ if(pVg == NULL) { tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + code = TSDB_CODE_TMQ_INVALID_VGID; goto END; } pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); @@ -2483,88 +2469,92 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ } END: + terrno = code; taosWUnLockLatch(&tmq->lock); return pRspObj; } -static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { - if (tmq == NULL) { - return NULL; - } +static void* tmqHandleAllRsp(tmq_t* tmq) { tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); + int32_t code = 0; void* returnVal = NULL; while (1) { SMqRspWrapper* pRspWrapper = NULL; if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { - if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){ - return NULL; + code = taosReadAllQitems(tmq->mqueue, tmq->qall); + if (code == 0){ + goto END; } - if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) { - return NULL; + code = taosGetQitem(tmq->qall, (void**)&pRspWrapper); + if (code == 0) { + goto END; } } - tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType); + tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); if (pRspWrapper->code != 0) { - processMqRspError(tmq, pRspWrapper); + code = processMqRspError(tmq, pRspWrapper); }else{ returnVal = processMqRsp(tmq, pRspWrapper); + code = terrno; } tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); - if(returnVal != NULL){ + if(returnVal != NULL || code != 0){ break; } } + END: + terrno = code; return returnVal; } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { - if (tmq == NULL) return NULL; + int32_t lino = 0; + int32_t code = 0; + TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA); void* rspObj = NULL; int64_t startTime = taosGetTimestampMs(); - tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, - timeout); - - // in no topic status, delayed task also need to be processed - if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); - taosMsleep(500); // sleep for a while - return NULL; - } + tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); + TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS); (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1); while (1) { - tmqHandleAllDelayedTask(tmq); + code = tmqHandleAllDelayedTask(tmq); + TSDB_CHECK_CODE(code, lino, END); - if (tmqPollImpl(tmq, timeout) < 0) { - tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); - } + code = tmqPollImpl(tmq, timeout); + TSDB_CHECK_CODE(code, lino, END); - rspObj = tmqHandleAllRsp(tmq, timeout); + rspObj = tmqHandleAllRsp(tmq); if (rspObj) { tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } + code = terrno; + TSDB_CHECK_CODE(code, lino, END); if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; - if (elapsedTime > timeout || elapsedTime < 0) { - tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, - tmq->consumerId, tmq->epoch, startTime, currentTime); - return NULL; - } + TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, 0); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { (void)tsem2_timewait(&tmq->rspSem, 1000); } } + + END: + terrno = code; + if (tmq != NULL) { + tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno)); + } + return NULL; } static void displayConsumeStatistics(tmq_t* pTmq) { @@ -2572,7 +2562,7 @@ static void displayConsumeStatistics(tmq_t* pTmq) { taosRLockLatch(&pTmq->lock); int32_t numOfTopics = taosArrayGetSize(pTmq->clientTopics); tqInfoC("consumer:0x%" PRIx64 " closing poll:%" PRId64 " rows:%" PRId64 " topics:%d, final epoch:%d", - pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); + pTmq->consumerId, pTmq->pollCnt, pTmq->totalRows, numOfTopics, pTmq->epoch); tqInfoC("consumer:0x%" PRIx64 " rows dist begin: ", pTmq->consumerId); for (int32_t i = 0; i < numOfTopics; ++i) { @@ -2597,14 +2587,14 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { tqInfoC("consumer:0x%" PRIx64 " start to unsubscribe consumer, status:%d", tmq->consumerId, status); displayConsumeStatistics(tmq); - if (status != TMQ_CONSUMER_STATUS__READY) { + if (status != TMQ_CONSUMER_STATUS__READY && status != TMQ_CONSUMER_STATUS__LOST) { tqInfoC("consumer:0x%" PRIx64 " status:%d, already closed or not in ready state, no need unsubscribe", tmq->consumerId, status); goto END; } if (tmq->autoCommit) { code = tmq_commit_sync(tmq, NULL); if (code != 0) { - goto END; + goto END; } } tmqSendHbReq((void*)(tmq->refId), NULL); @@ -2620,7 +2610,7 @@ int32_t tmq_unsubscribe(tmq_t* tmq) { goto END; } -END: + END: return code; } @@ -2942,7 +2932,7 @@ void tmq_commit_offset_async(tmq_t* tmq, const char* pTopicName, int32_t vgId, i tqInfoC("consumer:0x%" PRIx64 " async send commit to vgId:%d, offset:%" PRId64 " code:%s", tmq->consumerId, vgId, offset, tstrerror(code)); -end: + end: if (code != 0 && cb != NULL) { if (code == TSDB_CODE_TMQ_SAME_COMMITTED_VALUE) code = TSDB_CODE_SUCCESS; cb(tmq, code, param); @@ -2951,9 +2941,9 @@ end: int32_t tmqGetNextResInfo(TAOS_RES* res, bool convertUcs4, SReqResultInfo** pResInfo) { - if (res == NULL || pResInfo == NULL) { - return TSDB_CODE_INVALID_PARA; - } + if (res == NULL || pResInfo == NULL) { + return TSDB_CODE_INVALID_PARA; + } SMqRspObj* pRspObj = (SMqRspObj*)res; SMqDataRsp* data = &pRspObj->dataRsp; @@ -3014,9 +3004,9 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { SMqRspHead* pHead = pMsg->pData; tmq_topic_assignment assignment = {.begin = pHead->walsver, - .end = pHead->walever + 1, - .currentOffset = rsp.rspOffset.version, - .vgId = pParam->vgId}; + .end = pHead->walever + 1, + .currentOffset = rsp.rspOffset.version, + .vgId = pParam->vgId}; (void)taosThreadMutexLock(&pCommon->mutex); if (taosArrayPush(pCommon->pList, &assignment) == NULL) { @@ -3027,7 +3017,7 @@ static int32_t tmqGetWalInfoCb(void* param, SDataBuf* pMsg, int32_t code) { (void)taosThreadMutexUnlock(&pCommon->mutex); } -END: + END: pCommon->code = code; if (total == pParam->totalReq) { if (tsem2_post(&pCommon->rsp) != 0) { @@ -3084,7 +3074,7 @@ static int32_t tmCommittedCb(void* param, SDataBuf* pMsg, int32_t code) { tDecoderClear(&decoder); } -end: + end: if (pMsg) { taosMemoryFree(pMsg->pData); taosMemoryFree(pMsg->pEpSet); @@ -3290,7 +3280,7 @@ int64_t tmq_committed(tmq_t* tmq, const char* pTopicName, int32_t vgId) { committed = getCommittedFromServer(tmq, tname, vgId, &epSet); -end: + end: tqInfoC("consumer:0x%" PRIx64 " tmq_committed vgId:%d committed:%" PRId64, tmq->consumerId, vgId, committed); return committed; } @@ -3493,7 +3483,7 @@ int32_t tmq_get_topic_assignment(tmq_t* tmq, const char* pTopicName, tmq_topic_a } } -end: + end: if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(*assignment); *assignment = NULL; @@ -3647,4 +3637,4 @@ TAOS* tmq_get_connect(tmq_t* tmq) { return (TAOS*)(&(tmq->pTscObj->id)); } return NULL; -} +} \ No newline at end of file diff --git a/source/dnode/mnode/impl/src/mndConsumer.c b/source/dnode/mnode/impl/src/mndConsumer.c index 6e9dc6ab17..c70f10fc44 100644 --- a/source/dnode/mnode/impl/src/mndConsumer.c +++ b/source/dnode/mnode/impl/src/mndConsumer.c @@ -263,7 +263,7 @@ static int32_t mndProcessMqHbReq(SRpcMsg *pMsg) { MND_TMQ_RETURN_CHECK(mndAcquireConsumer(pMnode, consumerId, &pConsumer)); MND_TMQ_RETURN_CHECK(checkPrivilege(pMnode, pConsumer, &rsp, pMsg->info.conn.user)); atomic_store_32(&pConsumer->hbStatus, 0); - mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d %d", consumerId, req.pollFlag, pConsumer->pollStatus); + mDebug("consumer:0x%" PRIx64 " receive hb pollFlag:%d pollStatus:%d", consumerId, req.pollFlag, pConsumer->pollStatus); if (req.pollFlag == 1){ atomic_store_32(&pConsumer->pollStatus, 0); } diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 28a0d11757..12a803d1d8 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -112,13 +112,12 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); // tqRead -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset); +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, int64_t timeout); int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded); -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId); void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); @@ -178,6 +177,7 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo #define TQ_SUBSCRIBE_NAME "subscribe" #define TQ_OFFSET_NAME "offset-ver0" +#define TQ_POLL_MAX_TIME 1000 #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index b4dc610a6a..a65b118aea 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -15,16 +15,13 @@ #include "tq.h" -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { - int32_t code = TDB_CODE_SUCCESS; - int32_t lino = 0; - void* buf = NULL; - TSDB_CHECK_NULL(pBlock, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); +static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { + int32_t code = 0; + int32_t lino = 0; size_t dataEncodeBufSize = blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize; - buf = taosMemoryCalloc(1, dataStrLen); + void* buf = taosMemoryCalloc(1, dataStrLen); TSDB_CHECK_NULL(buf, code, lino, END, terrno); SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; @@ -35,16 +32,17 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols); TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno); + actualLen += sizeof(SRetrieveTableRspForTmq); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno); - tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", actualLen, buf); + buf = NULL; END: - if (code != TSDB_CODE_SUCCESS) { - taosMemoryFree(buf); - tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code)); + if (code != 0){ + tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code)); } + taosMemoryFree(buf); return code; } @@ -173,7 +171,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* TSDB_CHECK_CODE(code, lino, END); qStreamSetSourceExcluded(task, pRequest->sourceExcluded); - uint64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampMs(); while (1) { SSDataBlock* pDataBlock = NULL; code = getDataBlock(task, pHandle, vgId, &pDataBlock); @@ -192,7 +190,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pRsp->blockNum++; totalRows += pDataBlock->info.rows; - if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { + if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { break; } } @@ -207,22 +205,18 @@ END: return code; } -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) { +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, int64_t timeout) { int32_t code = 0; int32_t lino = 0; char* tbName = NULL; SSchemaWrapper* pSW = NULL; - TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA); - TSDB_CHECK_NULL(pBatchMetaRsp, code, lino, END, TSDB_CODE_INVALID_PARA); const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); TSDB_CHECK_CODE(code, lino, END); int32_t rowCnt = 0; + int64_t st = taosGetTimestampMs(); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; @@ -236,20 +230,23 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat tbName = taosStrdup(qExtractTbnameFromTask(task)); TSDB_CHECK_NULL(tbName, code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno); + tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName); tbName = NULL; } if (pRsp->withSchema) { - pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); + SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); TSDB_CHECK_NULL(pSW, code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno); pSW = NULL; } - code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); + code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); TSDB_CHECK_CODE(code, lino, END); + pRsp->blockNum++; rowCnt += pDataBlock->info.rows; - if (rowCnt <= tmqRowSize) { + if (rowCnt <= tmqRowSize && (taosGetTimestampMs() - st <= TMIN(TQ_POLL_MAX_TIME, timeout))) { continue; } } @@ -283,11 +280,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat break; } } - tqDebug("%s:%d success", __FUNCTION__, lino); END: if (code != 0){ - tqDebug("%s:%d failed, code:%s", __FUNCTION__, lino, tstrerror(code) ); + tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code)); } taosMemoryFree(pSW); taosMemoryFree(tbName); @@ -422,4 +418,4 @@ END: tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code)); } return code; -} +} \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 6ecb1b1b4d..197a45cdb9 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -101,7 +101,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand char formatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(formatBuf, TSDB_OFFSET_LEN, pOffsetVal); tqDebug("tmq poll: consumer:0x%" PRIx64 - ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64, + ", subkey %s, vgId:%d, existed offset found, offset reset to %s and continue.QID:0x%" PRIx64, consumerId, pHandle->subKey, vgId, formatBuf, pRequest->reqId); return 0; } else { @@ -138,7 +138,7 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand return code; } else if (pRequest->reqOffset.type == TMQ_OFFSET__RESET_NONE) { tqError("tmq poll: subkey:%s, no offset committed for consumer:0x%" PRIx64 - " in vg %d, subkey %s, reset none failed", + " in vg %d, subkey %s, reset none failed", pHandle->subKey, consumerId, vgId, pRequest->subKey); return TSDB_CODE_TQ_NO_COMMITTED_OFFSET; } @@ -231,7 +231,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset)); if (offset->type != TMQ_OFFSET__LOG) { - TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset)); + TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest->timeout)); if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) { code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); @@ -274,7 +274,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto END; } @@ -287,7 +287,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto END; } @@ -349,7 +349,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf)); TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen)); totalMetaRows++; - if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) { + if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); goto END; @@ -372,10 +372,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded)); - if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { + if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, - taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); + taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto END; } else { fetchVer++; @@ -386,7 +386,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, END: if (code != 0){ tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, - pRequest->subKey); + pRequest->subKey); } tDeleteMqBatchMetaRsp(&btMetaRsp); tDeleteSTaosxRsp(&taosxRsp); @@ -794,4 +794,4 @@ _exit: taosMemoryFree(pBlock); } return code; -} +} \ No newline at end of file diff --git a/source/os/src/osMemory.c b/source/os/src/osMemory.c index fdbf4853ad..51454653ca 100644 --- a/source/os/src/osMemory.c +++ b/source/os/src/osMemory.c @@ -294,7 +294,10 @@ void *taosMemCalloc(int64_t num, int64_t size) { #ifdef USE_TD_MEMORY int32_t memorySize = num * size; char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1); - if (tmp == NULL) return NULL; + if (tmp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp; pTdMemoryInfo->memorySize = memorySize; @@ -328,6 +331,7 @@ void *taosMemRealloc(void *ptr, int64_t size) { TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo)); if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } @@ -335,7 +339,10 @@ void *taosMemRealloc(void *ptr, int64_t size) { memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo)); void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo)); - if (tmp == NULL) return NULL; + if (tmp == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo)); ((TdMemoryInfoPtr)tmp)->memorySize = size; diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b2a8c422f7..c57d278c3b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -29,6 +29,7 @@ static threadlocal char tsErrMsgReturn[ERR_MSG_LEN] = {0}; int32_t* taosGetErrno() { return &tsErrno; } int32_t* taosGetErrln() { return &tsErrln; } char* taosGetErrMsg() { return tsErrMsgDetail; } +void taosClearErrMsg() { tsErrMsgDetail[0] = '\0'; } char* taosGetErrMsgReturn() { return tsErrMsgReturn; } #ifdef TAOS_ERROR_C @@ -856,6 +857,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only on TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") diff --git a/tests/parallel_test/cases.task b/tests/parallel_test/cases.task index 5fce3821da..b84749f354 100644 --- a/tests/parallel_test/cases.task +++ b/tests/parallel_test/cases.task @@ -333,6 +333,7 @@ ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/td-33225.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_ts4563.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32526.py +,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_td32471.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmq_replay.py ,,y,system-test,./pytest.sh python3 ./test.py -f 7-tmq/tmqSeekAndCommit.py ,,n,system-test,python3 ./test.py -f 7-tmq/tmq_offset.py diff --git a/tests/system-test/7-tmq/tmq_td32471.py b/tests/system-test/7-tmq/tmq_td32471.py new file mode 100644 index 0000000000..2672c1c3b8 --- /dev/null +++ b/tests/system-test/7-tmq/tmq_td32471.py @@ -0,0 +1,54 @@ +import taos +import sys +import time +import socket +import os +import threading + +from util.log import * +from util.sql import * +from util.cases import * +from util.dnodes import * +from util.common import * +from taos.tmq import * +sys.path.append("./7-tmq") +from tmqCommon import * + +class TDTestCase: + updatecfgDict = {'debugFlag': 135, 'asynclog': 0} + def init(self, conn, logSql, replicaVar=1): + self.replicaVar = int(replicaVar) + tdLog.debug(f"start to excute {__file__}") + tdSql.init(conn.cursor()) + #tdSql.init(conn.cursor(), logSql) # output sql.txt file + + def run(self): + tdSql.execute(f'create database if not exists db_32471') + tdSql.execute(f'use db_32471') + tdSql.execute(f'CREATE STABLE meters (ts TIMESTAMP, current FLOAT, voltage INT, phase FLOAT) TAGS (location BINARY(64), groupId INT)') + tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-05 14:38:05.000',10.30000,219,0.31000)") + + buildPath = tdCom.getBuildPath() + cmdStr = '%s/build/bin/tmq_td32471'%(buildPath) + # tdLog.info(cmdStr) + # os.system(cmdStr) + # + # tdSql.execute("drop topic db_32471_topic") + tdSql.execute(f'alter stable meters add column item_tags nchar(500)') + tdSql.execute(f'alter stable meters add column new_col nchar(100)') + tdSql.execute("create topic db_32471_topic as select * from db_32471.meters") + + tdSql.execute("INSERT INTO d1001 USING meters TAGS('California.SanFrancisco', 2) VALUES('2018-10-06 14:38:05.000',10.30000,219,0.31000, 'aaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', '1')") + + tdLog.info(cmdStr) + if os.system(cmdStr) != 0: + tdLog.exit(cmdStr) + + return + + def stop(self): + tdSql.close() + tdLog.success(f"{__file__} successfully executed") + +tdCases.addLinux(__file__, TDTestCase()) +tdCases.addWindows(__file__, TDTestCase()) \ No newline at end of file diff --git a/utils/test/c/CMakeLists.txt b/utils/test/c/CMakeLists.txt index cb0410e9bf..d1c049ef1e 100644 --- a/utils/test/c/CMakeLists.txt +++ b/utils/test/c/CMakeLists.txt @@ -6,6 +6,7 @@ add_executable(tmq_taosx_ci tmq_taosx_ci.c) add_executable(tmq_ts5466 tmq_ts5466.c) add_executable(tmq_td32526 tmq_td32526.c) add_executable(tmq_td32187 tmq_td32187.c) +add_executable(tmq_td32471 tmq_td32471.c) add_executable(tmq_write_raw_test tmq_write_raw_test.c) add_executable(write_raw_block_test write_raw_block_test.c) add_executable(sml_test sml_test.c) @@ -72,6 +73,13 @@ target_link_libraries( PUBLIC common PUBLIC os ) +target_link_libraries( + tmq_td32471 + PUBLIC ${TAOS_LIB} + PUBLIC util + PUBLIC common + PUBLIC os +) target_link_libraries( tmq_td32526 PUBLIC ${TAOS_LIB} diff --git a/utils/test/c/tmq_td32471.c b/utils/test/c/tmq_td32471.c new file mode 100644 index 0000000000..bf14e3f61b --- /dev/null +++ b/utils/test/c/tmq_td32471.c @@ -0,0 +1,93 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include +#include +#include +#include +#include +#include "cJSON.h" +#include "taos.h" +#include "tmsg.h" +#include "types.h" + +void tmq_commit_cb_print(tmq_t* tmq, int32_t code, void* param) { + printf("commit %d tmq %p param %p\n", code, tmq, param); +} + +tmq_t* build_consumer() { + tmq_conf_t* conf = tmq_conf_new(); + tmq_conf_set(conf, "group.id", "g1"); + tmq_conf_set(conf, "client.id", "my app 1"); + tmq_conf_set(conf, "td.connect.user", "root"); + tmq_conf_set(conf, "td.connect.pass", "taosdata"); + tmq_conf_set(conf, "msg.with.table.name", "true"); + tmq_conf_set(conf, "enable.auto.commit", "true"); + tmq_conf_set(conf, "auto.offset.reset", "earliest"); + tmq_conf_set(conf, "msg.consume.excluded", "1"); + tmq_conf_set(conf, "max.poll.interval.ms", "2000"); + tmq_conf_set(conf, "heartbeat.interval.ms", "100"); + + tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); + assert(tmq); + tmq_conf_destroy(conf); + return tmq; +} + +tmq_list_t* build_topic_list() { + tmq_list_t* topic_list = tmq_list_new(); + tmq_list_append(topic_list, "db_32471_topic"); + return topic_list; +} + +void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { + int32_t code; + + if ((code = tmq_subscribe(tmq, topics))) { + fprintf(stderr, "%% Failed to start consuming topics: %s\n", tmq_err2str(code)); + printf("subscribe err\n"); + return; + } + int32_t cnt = 0; + while (1) { + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + if (tmqmessage) { + cnt++; + taos_free_result(tmqmessage); + } else { + ASSERT(taos_errno(NULL) == 0); + break; + } + } + + taosSsleep(5); + TAOS_RES* tmqmessage = tmq_consumer_poll(tmq, 1000); + ASSERT(tmqmessage == NULL); + ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_CONSUMER_MISMATCH); + + code = tmq_consumer_close(tmq); + if (code) + fprintf(stderr, "%% Failed to close consumer: %s\n", tmq_err2str(code)); + else + fprintf(stderr, "%% Consumer closed\n"); +} + +int main(int argc, char* argv[]) { + tmq_t* tmq = build_consumer(); + tmq_list_t* topic_list = build_topic_list(); + basic_consume_loop(tmq, topic_list); + tmq_list_destroy(topic_list); +} \ No newline at end of file diff --git a/utils/test/c/tmq_td32526.c b/utils/test/c/tmq_td32526.c index 42d38ec56c..b6e68c5efc 100644 --- a/utils/test/c/tmq_td32526.c +++ b/utils/test/c/tmq_td32526.c @@ -181,6 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { printResult(tmqmessage); taos_free_result(tmqmessage); } else { + ASSERT(taos_errno(NULL) == 0); break; } }