Merge pull request #29436 from taosdata/fix/TD-32471
fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL
This commit is contained in:
commit
187682790e
|
@ -509,7 +509,7 @@ For the OpenTSDB text protocol, the parsing of timestamps follows its official p
|
|||
- **Interface Description**: Used for polling to consume data. Each consumer can only call this interface in a single thread.
|
||||
- tmq: [Input] Points to a valid ws_tmq_t structure pointer, which represents a TMQ consumer object.
|
||||
- timeout: [Input] Polling timeout in milliseconds, a negative number indicates a default timeout of 1 second.
|
||||
- **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: Failure, indicates no data. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc.
|
||||
- **Return Value**: Non-`NULL`: Success, returns a pointer to a WS_RES structure, which contains the received message. `NULL`: indicates no data, the error code can be obtained through ws_errno (NULL), please refer to the reference manual for specific error message. WS_RES results are consistent with taos_query results, and information in WS_RES can be obtained through various query interfaces, such as schema, etc.
|
||||
|
||||
- `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)`
|
||||
- **Interface Description**: Used to close the ws_tmq_t structure. Must be used in conjunction with ws_tmq_consumer_new.
|
||||
|
@ -1195,7 +1195,7 @@ In addition to using SQL or parameter binding APIs to insert data, you can also
|
|||
- **Interface Description**: Used to poll for consuming data, each consumer can only call this interface in a single thread.
|
||||
- tmq: [Input] Points to a valid tmq_t structure pointer, representing a TMQ consumer object.
|
||||
- timeout: [Input] Polling timeout in milliseconds, a negative number indicates a default timeout of 1 second.
|
||||
- **Return Value**: Non-`NULL`: Success, returns a pointer to a TAOS_RES structure containing the received messages. `NULL`: Failure, indicates no data. TAOS_RES results are consistent with taos_query results, and information in TAOS_RES can be obtained through various query interfaces, such as schema, etc.
|
||||
- **Return Value**: Non-`NULL`: Success, returns a pointer to a TAOS_RES structure containing the received messages. `NULL`: indicates no data, the error code can be obtained through taos_errno (NULL), please refer to the reference manual for specific error message. TAOS_RES results are consistent with taos_query results, and information in TAOS_RES can be obtained through various query interfaces, such as schema, etc.
|
||||
|
||||
- `int32_t tmq_consumer_close(tmq_t *tmq)`
|
||||
- **Interface Description**: Used to close a tmq_t structure. Must be used in conjunction with tmq_consumer_new.
|
||||
|
|
|
@ -534,4 +534,7 @@ This document details the server error codes that may be encountered when using
|
|||
| 0x80004000 | Invalid message | The subscribed data is illegal, generally does not occur | Check the client-side error logs for details |
|
||||
| 0x80004001 | Consumer mismatch | The vnode requested for subscription and the reassigned vnode are inconsistent, usually occurs when new consumers join the same consumer group | Internal error, not exposed to users |
|
||||
| 0x80004002 | Consumer closed | The consumer no longer exists | Check if it has already been closed |
|
||||
| 0x80004017 | Invalid status, please subscribe topic first | tmq status invalidate | Without calling subscribe, directly poll data |
|
||||
| 0x80004018 | TMQ poll timeout | timeout is too small or there is no data to consume | Adjust the timeout parameter appropriately or check if the data has been consumed |
|
||||
| 0x80004100 | Stream task not exist | The stream computing task does not exist | Check the server-side error logs |
|
||||
|
||||
|
|
|
@ -509,7 +509,7 @@ TDengine 推荐数据库应用的每个线程都建立一个独立的连接,
|
|||
- **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。
|
||||
- tmq:[入参] 指向一个有效的 ws_tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- timeout:[入参] 轮询的超时时间,单位为毫秒,负数表示默认超时1秒。
|
||||
- **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`:失败,表示没有数据。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。
|
||||
- **返回值**:非 `NULL`:成功,返回一个指向 WS_RES 结构体的指针,该结构体包含了接收到的消息。`NULL`:表示没有数据, 可通过 ws_errno(NULL) 获取错误码,具体错误码参见参考手册。WS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 WS_RES 里的信息,比如 schema 等。
|
||||
|
||||
- `int32_t ws_tmq_consumer_close(ws_tmq_t *tmq)`
|
||||
- **接口说明**:用于关闭 ws_tmq_t 结构体。需与 ws_tmq_consumer_new 配合使用。
|
||||
|
@ -1187,7 +1187,7 @@ TDengine 的异步 API 均采用非阻塞调用模式。应用程序可以用多
|
|||
- **接口说明**:用于轮询消费数据,每一个消费者,只能单线程调用该接口。
|
||||
- tmq:[入参] 指向一个有效的 tmq_t 结构体指针,该结构体代表一个 TMQ 消费者对象。
|
||||
- timeout:[入参] 轮询的超时时间,单位为毫秒,负数表示默认超时1秒。
|
||||
- **返回值**:非 `NULL`:成功,返回一个指向 TAOS_RES 结构体的指针,该结构体包含了接收到的消息。。`NULL`:失败,表示没有数据。TAOS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 TAOS_RES 里的信息,比如 schema 等。
|
||||
- **返回值**:非 `NULL`:成功,返回一个指向 TAOS_RES 结构体的指针,该结构体包含了接收到的消息。。`NULL`:表示没有数据,可通过taos_errno(NULL) 获取错误码,具体错误码参见参考手册。TAOS_RES 结果和 taos_query 返回结果一致,可通过查询的各种接口获取 TAOS_RES 里的信息,比如 schema 等。
|
||||
|
||||
- `int32_t tmq_consumer_close(tmq_t *tmq)`
|
||||
- **接口说明**:用于关闭 tmq_t 结构体。需与 tmq_consumer_new 配合使用。
|
||||
|
|
|
@ -554,5 +554,7 @@ description: TDengine 服务端的错误码列表和详细说明
|
|||
| 0x80004000 | Invalid message | 订阅到的数据非法,一般不会出现 | 具体查看client端的错误日志提示 |
|
||||
| 0x80004001 | Consumer mismatch | 订阅请求的vnode和重新分配的vnode不一致,一般存在于有新消费者加入相同消费者组里时 | 内部错误,不暴露给用户 |
|
||||
| 0x80004002 | Consumer closed | 消费者已经不存在了 | 查看是否已经close掉了 |
|
||||
| 0x80004017 | Invalid status, please subscribe topic first | 数据订阅状态不对 | 没有调用 subscribe,直接poll数据 |
|
||||
| 0x80004018 | TMQ poll timeout | 数据订阅超时,超时时间太短,或者数据消费完毕 | 可适当调大timeout 参数或者检测数据是否消费完毕 |
|
||||
| 0x80004100 | Stream task not exist | 流计算任务不存在 | 具体查看server端的错误日志 |
|
||||
|
||||
|
|
|
@ -123,6 +123,10 @@ enum {
|
|||
TMQ_MSG_TYPE__POLL_BATCH_META_RSP,
|
||||
};
|
||||
|
||||
static char* tmqMsgTypeStr[] = {
|
||||
"data", "meta", "ask ep", "meta data", "wal info", "batch meta"
|
||||
};
|
||||
|
||||
enum {
|
||||
STREAM_INPUT__DATA_SUBMIT = 1,
|
||||
STREAM_INPUT__DATA_BLOCK,
|
||||
|
|
|
@ -47,6 +47,7 @@ const char* terrstr();
|
|||
|
||||
char* taosGetErrMsgReturn();
|
||||
char* taosGetErrMsg();
|
||||
void taosClearErrMsg();
|
||||
int32_t* taosGetErrno();
|
||||
int32_t* taosGetErrln();
|
||||
int32_t taosGetErrSize();
|
||||
|
@ -1014,6 +1015,8 @@ int32_t taosGetErrSize();
|
|||
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
|
||||
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
|
||||
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
|
||||
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
|
||||
#define TSDB_CODE_TMQ_POLL_TIMEOUT TAOS_DEF_ERROR_CODE(0, 0x4018)
|
||||
|
||||
// stream
|
||||
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)
|
||||
|
|
|
@ -2572,6 +2572,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
|
|||
SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
taosClearErrMsg(); // clear global error message
|
||||
|
||||
return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
|
||||
}
|
||||
|
|
|
@ -1209,9 +1209,9 @@ static void initClientTopicFromRsp(SMqClientTopic* pTopic, SMqSubTopicEp* pTopic
|
|||
}
|
||||
|
||||
static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp* pRsp){
|
||||
if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
|
||||
return;
|
||||
}
|
||||
if (tmq == NULL || newTopics == NULL || pRsp == NULL) {
|
||||
return;
|
||||
}
|
||||
SHashObj* pVgOffsetHashMap = taosHashInit(64, MurmurHash3_32, false, HASH_NO_LOCK);
|
||||
if (pVgOffsetHashMap == NULL) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " taos hash init null, code:%d", tmq->consumerId, terrno);
|
||||
|
@ -1266,9 +1266,9 @@ static void buildNewTopicList(tmq_t* tmq, SArray* newTopics, const SMqAskEpRsp*
|
|||
}
|
||||
|
||||
static void doUpdateLocalEp(tmq_t* tmq, int32_t epoch, const SMqAskEpRsp* pRsp) {
|
||||
if (tmq == NULL || pRsp == NULL) {
|
||||
return;
|
||||
}
|
||||
if (tmq == NULL || pRsp == NULL) {
|
||||
return;
|
||||
}
|
||||
int32_t topicNumGet = taosArrayGetSize(pRsp->topics);
|
||||
// vnode transform (epoch == tmq->epoch && topicNumGet != 0)
|
||||
// ask ep rsp (epoch == tmq->epoch && topicNumGet == 0)
|
||||
|
@ -1388,49 +1388,32 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
if (pTmq == NULL) {
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
SMqAskEpReq req = {0};
|
||||
req.consumerId = pTmq->consumerId;
|
||||
req.epoch = updateEpSet ? -1 : pTmq->epoch;
|
||||
tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
|
||||
int code = 0;
|
||||
SMqAskEpCbParam* pParam = NULL;
|
||||
void* pReq = NULL;
|
||||
|
||||
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
|
||||
if (tlen < 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
|
||||
TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
pReq = taosMemoryCalloc(1, tlen);
|
||||
if (pReq == NULL) {
|
||||
tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
|
||||
return terrno;
|
||||
}
|
||||
TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
|
||||
|
||||
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
|
||||
taosMemoryFree(pReq);
|
||||
return TSDB_CODE_INVALID_PARA;
|
||||
}
|
||||
code = tSerializeSMqAskEpReq(pReq, tlen, &req);
|
||||
TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
|
||||
if (pParam == NULL) {
|
||||
tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
|
||||
taosMemoryFree(pReq);
|
||||
return terrno;
|
||||
}
|
||||
TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
|
||||
|
||||
pParam->refId = pTmq->refId;
|
||||
pParam->sync = sync;
|
||||
pParam->pParam = param;
|
||||
|
||||
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
taosMemoryFree(pReq);
|
||||
taosMemoryFree(pParam);
|
||||
return terrno;
|
||||
}
|
||||
TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
|
||||
sendInfo->requestId = generateRequestId();
|
||||
|
@ -1440,28 +1423,36 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
|
|||
sendInfo->fp = askEpCb;
|
||||
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
|
||||
|
||||
pReq = NULL;
|
||||
pParam = NULL;
|
||||
|
||||
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
|
||||
tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
|
||||
return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
|
||||
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
|
||||
|
||||
END:
|
||||
if (code != 0) {
|
||||
tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(pReq);
|
||||
taosMemoryFree(pParam);
|
||||
return code;
|
||||
}
|
||||
|
||||
void tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||
if (pTmq == NULL) {
|
||||
return;
|
||||
}
|
||||
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
||||
STaosQall* qall = NULL;
|
||||
int32_t code = 0;
|
||||
|
||||
code = taosAllocateQall(&qall);
|
||||
if (code) {
|
||||
tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
|
||||
return;
|
||||
return code;
|
||||
}
|
||||
|
||||
int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
|
||||
if (numOfItems == 0) {
|
||||
taosFreeQall(qall);
|
||||
return;
|
||||
return 0;
|
||||
}
|
||||
|
||||
tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
|
||||
|
@ -1472,7 +1463,6 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
code = askEp(pTmq, NULL, false, false);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
|
||||
continue;
|
||||
}
|
||||
tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
|
||||
bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
|
||||
|
@ -1494,6 +1484,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
|
|||
}
|
||||
|
||||
taosFreeQall(qall);
|
||||
return 0;
|
||||
}
|
||||
|
||||
void tmqClearUnhandleMsg(tmq_t* tmq) {
|
||||
|
@ -2095,7 +2086,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
|
|||
(void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
|
||||
pReq->withTbName = tmq->withTbName;
|
||||
pReq->consumerId = tmq->consumerId;
|
||||
pReq->timeout = timeout;
|
||||
pReq->timeout = timeout < 0 ? INT32_MAX : timeout;
|
||||
pReq->epoch = tmq->epoch;
|
||||
pReq->reqOffset = pVg->offsetInfo.endOffset;
|
||||
pReq->head.vgId = pVg->vgId;
|
||||
|
@ -2199,39 +2190,24 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
|
|||
}
|
||||
|
||||
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
|
||||
if (pTmq == NULL || pTopic == NULL || pVg == NULL) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
}
|
||||
SMqPollReq req = {0};
|
||||
char* msg = NULL;
|
||||
SMqPollCbParam* pParam = NULL;
|
||||
SMsgSendInfo* sendInfo = NULL;
|
||||
int code = 0;
|
||||
int lino = 0;
|
||||
tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
|
||||
|
||||
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
|
||||
if (msgSize < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
|
||||
|
||||
msg = taosMemoryCalloc(1, msgSize);
|
||||
if (NULL == msg) {
|
||||
return terrno;
|
||||
}
|
||||
TSDB_CHECK_NULL(msg, code, lino, END, terrno);
|
||||
|
||||
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
|
||||
code = TSDB_CODE_INVALID_MSG;
|
||||
taosMemoryFreeClear(msg);
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
|
||||
|
||||
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
|
||||
if (pParam == NULL) {
|
||||
code = terrno;
|
||||
taosMemoryFreeClear(msg);
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
|
||||
|
||||
pParam->refId = pTmq->refId;
|
||||
tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
|
||||
|
@ -2239,11 +2215,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
pParam->requestId = req.reqId;
|
||||
|
||||
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
|
||||
if (sendInfo == NULL) {
|
||||
taosMemoryFreeClear(pParam);
|
||||
taosMemoryFreeClear(msg);
|
||||
return terrno;
|
||||
}
|
||||
TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
|
||||
|
||||
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
|
||||
sendInfo->requestId = req.reqId;
|
||||
|
@ -2253,23 +2225,29 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
|
|||
sendInfo->fp = tmqPollCb;
|
||||
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
|
||||
|
||||
msg = NULL;
|
||||
pParam = NULL;
|
||||
|
||||
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
|
||||
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
|
||||
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
|
||||
tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
|
||||
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
|
||||
if (code != 0) {
|
||||
return code;
|
||||
}
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
pVg->pollCnt++;
|
||||
pVg->seekUpdated = false; // reset this flag.
|
||||
pTmq->pollCnt++;
|
||||
|
||||
return 0;
|
||||
END:
|
||||
if (code != 0){
|
||||
tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosMemoryFreeClear(pParam);
|
||||
taosMemoryFreeClear(msg);
|
||||
return code;
|
||||
}
|
||||
|
||||
// broadcast the poll request to all related vnodes
|
||||
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
|
||||
if (tmq == NULL) {
|
||||
return TSDB_CODE_INVALID_MSG;
|
||||
|
@ -2361,9 +2339,6 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
|
|||
SMqBatchMetaRsp batchMetaRsp;
|
||||
} MEMSIZE;
|
||||
|
||||
if (pollRspWrapper == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
SMqRspObj* pRspObj = taosMemoryCalloc(1, sizeof(SMqRspObj));
|
||||
if (pRspObj == NULL) {
|
||||
tqErrorC("buildRsp:failed to allocate memory");
|
||||
|
@ -2377,22 +2352,22 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
|
|||
return pRspObj;
|
||||
}
|
||||
|
||||
static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
||||
if (tmq == NULL || pRspWrapper == NULL) {
|
||||
return;
|
||||
}
|
||||
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
||||
int32_t code = 0;
|
||||
SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
|
||||
|
||||
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
|
||||
int32_t code = askEp(tmq, NULL, false, true);
|
||||
code = askEp(tmq, NULL, false, true);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
|
||||
}
|
||||
} else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
|
||||
int32_t code = askEp(tmq, NULL, false, false);
|
||||
code = askEp(tmq, NULL, false, false);
|
||||
if (code != 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
|
||||
}
|
||||
} else if (code == TSDB_CODE_TMQ_NO_TABLE_QUALIFIED){
|
||||
code = 0;
|
||||
}
|
||||
tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
|
||||
tstrerror(pRspWrapper->code));
|
||||
|
@ -2404,17 +2379,18 @@ static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
|
||||
}
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
|
||||
return code;
|
||||
}
|
||||
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
||||
if (tmq == NULL || pRspWrapper == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
int32_t code = 0;
|
||||
SMqRspObj* pRspObj = NULL;
|
||||
|
||||
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " ep msg received", tmq->consumerId);
|
||||
SMqAskEpRsp* rspMsg = &pRspWrapper->epRsp;
|
||||
doUpdateLocalEp(tmq, pRspWrapper->epoch, rspMsg);
|
||||
terrno = code;
|
||||
return pRspObj;
|
||||
}
|
||||
|
||||
|
@ -2425,6 +2401,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
if(pVg == NULL) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
|
||||
pollRspWrapper->topicName, pollRspWrapper->vgId);
|
||||
code = TSDB_CODE_TMQ_INVALID_VGID;
|
||||
goto END;
|
||||
}
|
||||
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
|
||||
|
@ -2482,89 +2459,93 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
|
|||
pRspObj->resType = pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP ? RES_TYPE__TMQ_META : RES_TYPE__TMQ_BATCH_META;
|
||||
}
|
||||
|
||||
END:
|
||||
END:
|
||||
terrno = code;
|
||||
taosWUnLockLatch(&tmq->lock);
|
||||
return pRspObj;
|
||||
}
|
||||
|
||||
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
|
||||
if (tmq == NULL) {
|
||||
return NULL;
|
||||
}
|
||||
static void* tmqHandleAllRsp(tmq_t* tmq) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
|
||||
|
||||
int32_t code = 0;
|
||||
void* returnVal = NULL;
|
||||
while (1) {
|
||||
SMqRspWrapper* pRspWrapper = NULL;
|
||||
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
|
||||
if (taosReadAllQitems(tmq->mqueue, tmq->qall) == 0){
|
||||
return NULL;
|
||||
code = taosReadAllQitems(tmq->mqueue, tmq->qall);
|
||||
if (code == 0){
|
||||
goto END;
|
||||
}
|
||||
if (taosGetQitem(tmq->qall, (void**)&pRspWrapper) == 0) {
|
||||
return NULL;
|
||||
code = taosGetQitem(tmq->qall, (void**)&pRspWrapper);
|
||||
if (code == 0) {
|
||||
goto END;
|
||||
}
|
||||
}
|
||||
|
||||
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
|
||||
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
|
||||
if (pRspWrapper->code != 0) {
|
||||
processMqRspError(tmq, pRspWrapper);
|
||||
code = processMqRspError(tmq, pRspWrapper);
|
||||
}else{
|
||||
returnVal = processMqRsp(tmq, pRspWrapper);
|
||||
code = terrno;
|
||||
}
|
||||
tmqFreeRspWrapper(pRspWrapper);
|
||||
taosFreeQitem(pRspWrapper);
|
||||
if(returnVal != NULL){
|
||||
if(returnVal != NULL || code != 0){
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
END:
|
||||
terrno = code;
|
||||
return returnVal;
|
||||
}
|
||||
|
||||
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
|
||||
if (tmq == NULL) return NULL;
|
||||
int32_t lino = 0;
|
||||
int32_t code = 0;
|
||||
TSDB_CHECK_NULL(tmq, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
|
||||
void* rspObj = NULL;
|
||||
int64_t startTime = taosGetTimestampMs();
|
||||
|
||||
tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
|
||||
timeout);
|
||||
|
||||
// in no topic status, delayed task also need to be processed
|
||||
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
|
||||
tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
|
||||
taosMsleep(500); // sleep for a while
|
||||
return NULL;
|
||||
}
|
||||
tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
|
||||
TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, code, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
|
||||
|
||||
(void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
|
||||
|
||||
while (1) {
|
||||
tmqHandleAllDelayedTask(tmq);
|
||||
code = tmqHandleAllDelayedTask(tmq);
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
if (tmqPollImpl(tmq, timeout) < 0) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
|
||||
}
|
||||
code = tmqPollImpl(tmq, timeout);
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
rspObj = tmqHandleAllRsp(tmq, timeout);
|
||||
rspObj = tmqHandleAllRsp(tmq);
|
||||
if (rspObj) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
|
||||
return (TAOS_RES*)rspObj;
|
||||
}
|
||||
code = terrno;
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
if (timeout >= 0) {
|
||||
int64_t currentTime = taosGetTimestampMs();
|
||||
int64_t elapsedTime = currentTime - startTime;
|
||||
if (elapsedTime > timeout || elapsedTime < 0) {
|
||||
tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
|
||||
tmq->consumerId, tmq->epoch, startTime, currentTime);
|
||||
return NULL;
|
||||
}
|
||||
TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, code, lino, END, TSDB_CODE_TMQ_POLL_TIMEOUT);
|
||||
(void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
|
||||
} else {
|
||||
(void)tsem2_timewait(&tmq->rspSem, 1000);
|
||||
}
|
||||
}
|
||||
|
||||
END:
|
||||
terrno = code;
|
||||
if (tmq != NULL) {
|
||||
tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
|
||||
}
|
||||
return NULL;
|
||||
}
|
||||
|
||||
static void displayConsumeStatistics(tmq_t* pTmq) {
|
||||
|
|
|
@ -112,13 +112,12 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
|
|||
void tqDestroyTqHandle(void* data);
|
||||
|
||||
// tqRead
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset);
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, int64_t timeout);
|
||||
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
|
||||
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
|
||||
|
||||
// tqExec
|
||||
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
|
||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
|
||||
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
|
||||
int32_t type, int32_t vgId);
|
||||
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
|
||||
|
@ -176,8 +175,9 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo
|
|||
} \
|
||||
} while (0)
|
||||
|
||||
#define TQ_SUBSCRIBE_NAME "subscribe"
|
||||
#define TQ_OFFSET_NAME "offset-ver0"
|
||||
#define TQ_SUBSCRIBE_NAME "subscribe"
|
||||
#define TQ_OFFSET_NAME "offset-ver0"
|
||||
#define TQ_POLL_MAX_TIME 1000
|
||||
|
||||
#ifdef __cplusplus
|
||||
}
|
||||
|
|
|
@ -15,16 +15,13 @@
|
|||
|
||||
#include "tq.h"
|
||||
|
||||
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
|
||||
int32_t code = TDB_CODE_SUCCESS;
|
||||
int32_t lino = 0;
|
||||
void* buf = NULL;
|
||||
TSDB_CHECK_NULL(pBlock, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
|
||||
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
|
||||
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
|
||||
buf = taosMemoryCalloc(1, dataStrLen);
|
||||
void* buf = taosMemoryCalloc(1, dataStrLen);
|
||||
TSDB_CHECK_NULL(buf, code, lino, END, terrno);
|
||||
|
||||
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
|
||||
|
@ -35,16 +32,17 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
|
|||
|
||||
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
|
||||
TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
|
||||
|
||||
actualLen += sizeof(SRetrieveTableRspForTmq);
|
||||
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
|
||||
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
|
||||
|
||||
tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", actualLen, buf);
|
||||
buf = NULL;
|
||||
END:
|
||||
if (code != TSDB_CODE_SUCCESS) {
|
||||
taosMemoryFree(buf);
|
||||
tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code));
|
||||
if (code != 0){
|
||||
tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(buf);
|
||||
return code;
|
||||
}
|
||||
|
||||
|
@ -173,7 +171,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
|
||||
uint64_t st = taosGetTimestampMs();
|
||||
int64_t st = taosGetTimestampMs();
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
|
||||
|
@ -192,7 +190,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
|
|||
|
||||
pRsp->blockNum++;
|
||||
totalRows += pDataBlock->info.rows;
|
||||
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
|
||||
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -207,22 +205,18 @@ END:
|
|||
return code;
|
||||
}
|
||||
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
|
||||
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, int64_t timeout) {
|
||||
int32_t code = 0;
|
||||
int32_t lino = 0;
|
||||
char* tbName = NULL;
|
||||
SSchemaWrapper* pSW = NULL;
|
||||
TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
TSDB_CHECK_NULL(pBatchMetaRsp, code, lino, END, TSDB_CODE_INVALID_PARA);
|
||||
const STqExecHandle* pExec = &pHandle->execHandle;
|
||||
qTaskInfo_t task = pExec->task;
|
||||
code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
int32_t rowCnt = 0;
|
||||
int64_t st = taosGetTimestampMs();
|
||||
while (1) {
|
||||
SSDataBlock* pDataBlock = NULL;
|
||||
uint64_t ts = 0;
|
||||
|
@ -236,20 +230,23 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
|||
tbName = taosStrdup(qExtractTbnameFromTask(task));
|
||||
TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
|
||||
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
|
||||
tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
|
||||
tbName = NULL;
|
||||
}
|
||||
if (pRsp->withSchema) {
|
||||
pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
||||
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
|
||||
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
|
||||
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
|
||||
pSW = NULL;
|
||||
}
|
||||
|
||||
code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision);
|
||||
code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
|
||||
pTq->pVnode->config.tsdbCfg.precision);
|
||||
TSDB_CHECK_CODE(code, lino, END);
|
||||
|
||||
pRsp->blockNum++;
|
||||
rowCnt += pDataBlock->info.rows;
|
||||
if (rowCnt <= tmqRowSize) {
|
||||
if (rowCnt <= tmqRowSize && (taosGetTimestampMs() - st <= TMIN(TQ_POLL_MAX_TIME, timeout))) {
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
@ -283,11 +280,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
|
|||
break;
|
||||
}
|
||||
}
|
||||
|
||||
tqDebug("%s:%d success", __FUNCTION__, lino);
|
||||
END:
|
||||
if (code != 0){
|
||||
tqDebug("%s:%d failed, code:%s", __FUNCTION__, lino, tstrerror(code) );
|
||||
tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
|
||||
}
|
||||
taosMemoryFree(pSW);
|
||||
taosMemoryFree(tbName);
|
||||
|
|
|
@ -231,7 +231,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
|
||||
TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
|
||||
if (offset->type != TMQ_OFFSET__LOG) {
|
||||
TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset));
|
||||
TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest->timeout));
|
||||
|
||||
if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
|
||||
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
|
||||
|
@ -349,7 +349,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf));
|
||||
TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen));
|
||||
totalMetaRows++;
|
||||
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
|
||||
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
|
||||
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
|
||||
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
|
||||
goto END;
|
||||
|
@ -372,7 +372,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
|||
|
||||
TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded));
|
||||
|
||||
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
|
||||
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
|
||||
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
|
||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
|
||||
taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId);
|
||||
|
|
|
@ -294,7 +294,10 @@ void *taosMemCalloc(int64_t num, int64_t size) {
|
|||
#ifdef USE_TD_MEMORY
|
||||
int32_t memorySize = num * size;
|
||||
char *tmp = calloc(memorySize + sizeof(TdMemoryInfo), 1);
|
||||
if (tmp == NULL) return NULL;
|
||||
if (tmp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)tmp;
|
||||
pTdMemoryInfo->memorySize = memorySize;
|
||||
|
@ -328,6 +331,7 @@ void *taosMemRealloc(void *ptr, int64_t size) {
|
|||
|
||||
TdMemoryInfoPtr pTdMemoryInfo = (TdMemoryInfoPtr)((char *)ptr - sizeof(TdMemoryInfo));
|
||||
if (tpTdMemoryInfo->symbol != TD_MEMORY_SYMBOL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
|
@ -335,7 +339,10 @@ void *taosMemRealloc(void *ptr, int64_t size) {
|
|||
memcpy(&tdMemoryInfo, pTdMemoryInfo, sizeof(TdMemoryInfo));
|
||||
|
||||
void *tmp = realloc(pTdMemoryInfo, size + sizeof(TdMemoryInfo));
|
||||
if (tmp == NULL) return NULL;
|
||||
if (tmp == NULL) {
|
||||
terrno = TSDB_CODE_OUT_OF_MEMORY;
|
||||
return NULL;
|
||||
}
|
||||
|
||||
memcpy(tmp, &tdMemoryInfo, sizeof(TdMemoryInfo));
|
||||
((TdMemoryInfoPtr)tmp)->memorySize = size;
|
||||
|
|
|
@ -29,6 +29,7 @@ static threadlocal char tsErrMsgReturn[ERR_MSG_LEN] = {0};
|
|||
int32_t* taosGetErrno() { return &tsErrno; }
|
||||
int32_t* taosGetErrln() { return &tsErrln; }
|
||||
char* taosGetErrMsg() { return tsErrMsgDetail; }
|
||||
void taosClearErrMsg() { tsErrMsgDetail[0] = '\0'; }
|
||||
char* taosGetErrMsgReturn() { return tsErrMsgReturn; }
|
||||
|
||||
#ifdef TAOS_ERROR_C
|
||||
|
@ -857,6 +858,8 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only on
|
|||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first")
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_POLL_TIMEOUT, "TMQ poll timeout")
|
||||
|
||||
// stream
|
||||
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")
|
||||
|
|
|
@ -181,6 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
|
|||
printResult(tmqmessage);
|
||||
taos_free_result(tmqmessage);
|
||||
} else {
|
||||
ASSERT(taos_errno(NULL) == TSDB_CODE_TMQ_POLL_TIMEOUT);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue