fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL

This commit is contained in:
wangmm0220 2024-12-31 16:48:23 +08:00
parent b6f79b3462
commit 226701ad14
9 changed files with 131 additions and 169 deletions

View File

@ -123,6 +123,10 @@ enum {
TMQ_MSG_TYPE__POLL_BATCH_META_RSP,
};
static char* tmqMsgTypeStr[] = {
"data", "meta", "ask ep", "meta data", "wal info", "batch meta"
};
enum {
STREAM_INPUT__DATA_SUBMIT = 1,
STREAM_INPUT__DATA_BLOCK,

View File

@ -1013,6 +1013,7 @@ int32_t taosGetErrSize();
#define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014)
#define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015)
#define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016)
#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017)
// stream
#define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100)

View File

@ -2572,6 +2572,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) {
SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw);
return TSDB_CODE_INVALID_PARA;
}
SET_ERROR_MSG(""); // clear global error message
return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type);
}

View File

@ -1388,49 +1388,32 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
if (pTmq == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t code = 0;
int32_t lino = 0;
SMqAskEpReq req = {0};
req.consumerId = pTmq->consumerId;
req.epoch = updateEpSet ? -1 : pTmq->epoch;
tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN);
int code = 0;
SMqAskEpCbParam* pParam = NULL;
void* pReq = NULL;
int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req);
if (tlen < 0) {
tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId);
return TSDB_CODE_INVALID_PARA;
}
TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
pReq = taosMemoryCalloc(1, tlen);
if (pReq == NULL) {
tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen);
return terrno;
}
TSDB_CHECK_NULL(pReq, code, lino, END, terrno);
if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) {
tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen);
taosMemoryFree(pReq);
return TSDB_CODE_INVALID_PARA;
}
code = tSerializeSMqAskEpReq(pReq, tlen, &req);
TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA);
pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam));
if (pParam == NULL) {
tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId);
taosMemoryFree(pReq);
return terrno;
}
TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
pParam->refId = pTmq->refId;
pParam->sync = sync;
pParam->pParam = param;
SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFree(pReq);
taosMemoryFree(pParam);
return terrno;
}
TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL};
sendInfo->requestId = generateRequestId();
@ -1440,28 +1423,36 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) {
sendInfo->fp = askEpCb;
sendInfo->msgType = TDMT_MND_TMQ_ASK_EP;
pReq = NULL;
pParam = NULL;
SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp);
tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId);
return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo);
END:
if (code != 0) {
tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code));
}
taosMemoryFree(pReq);
taosMemoryFree(pParam);
return code;
}
void tmqHandleAllDelayedTask(tmq_t* pTmq) {
if (pTmq == NULL) {
return;
}
static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) {
STaosQall* qall = NULL;
int32_t code = 0;
code = taosAllocateQall(&qall);
if (code) {
tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code));
return;
return code;
}
int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall);
if (numOfItems == 0) {
taosFreeQall(qall);
return;
return 0;
}
tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems);
@ -1472,7 +1463,6 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
code = askEp(pTmq, NULL, false, false);
if (code != 0) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code));
continue;
}
tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId);
bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer,
@ -1494,6 +1484,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) {
}
taosFreeQall(qall);
return 0;
}
void tmqClearUnhandleMsg(tmq_t* tmq) {
@ -2095,7 +2086,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl
(void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName);
pReq->withTbName = tmq->withTbName;
pReq->consumerId = tmq->consumerId;
pReq->timeout = timeout;
pReq->timeout = timeout < 0 ? INT32_MAX : timeout;
pReq->epoch = tmq->epoch;
pReq->reqOffset = pVg->offsetInfo.endOffset;
pReq->head.vgId = pVg->vgId;
@ -2199,39 +2190,24 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg
}
static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) {
if (pTmq == NULL || pTopic == NULL || pVg == NULL) {
return TSDB_CODE_INVALID_MSG;
}
SMqPollReq req = {0};
char* msg = NULL;
SMqPollCbParam* pParam = NULL;
SMsgSendInfo* sendInfo = NULL;
int code = 0;
int lino = 0;
tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg);
int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req);
if (msgSize < 0) {
code = TSDB_CODE_INVALID_MSG;
return code;
}
TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
msg = taosMemoryCalloc(1, msgSize);
if (NULL == msg) {
return terrno;
}
TSDB_CHECK_NULL(msg, code, lino, END, terrno);
if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) {
code = TSDB_CODE_INVALID_MSG;
taosMemoryFreeClear(msg);
return code;
}
TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG);
pParam = taosMemoryMalloc(sizeof(SMqPollCbParam));
if (pParam == NULL) {
code = terrno;
taosMemoryFreeClear(msg);
return code;
}
TSDB_CHECK_NULL(pParam, code, lino, END, terrno);
pParam->refId = pTmq->refId;
tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN);
@ -2239,11 +2215,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
pParam->requestId = req.reqId;
sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo));
if (sendInfo == NULL) {
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(msg);
return terrno;
}
TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno);
sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL};
sendInfo->requestId = req.reqId;
@ -2253,23 +2225,29 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p
sendInfo->fp = tmqPollCb;
sendInfo->msgType = TDMT_VND_TMQ_CONSUME;
msg = NULL;
pParam = NULL;
char offsetFormatBuf[TSDB_OFFSET_LEN] = {0};
tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset);
code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo);
tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId,
pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId);
if (code != 0) {
return code;
}
TSDB_CHECK_CODE(code, lino, END);
pVg->pollCnt++;
pVg->seekUpdated = false; // reset this flag.
pTmq->pollCnt++;
return 0;
END:
if (code != 0){
tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code));
}
taosMemoryFreeClear(pParam);
taosMemoryFreeClear(msg);
return code;
}
// broadcast the poll request to all related vnodes
static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) {
if (tmq == NULL) {
return TSDB_CODE_INVALID_MSG;
@ -2377,22 +2355,23 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){
return pRspObj;
}
static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
if (tmq == NULL || pRspWrapper == NULL) {
return;
}
static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp;
if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform
int32_t code = askEp(tmq, NULL, false, true);
if (code != 0) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
return code;
}
} else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) {
int32_t code = askEp(tmq, NULL, false, false);
if (code != 0) {
tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code));
return code;
}
} else{
return pRspWrapper->code;
}
tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId,
tstrerror(pRspWrapper->code));
@ -2404,11 +2383,9 @@ static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE);
}
taosWUnLockLatch(&tmq->lock);
return TSDB_CODE_SUCCESS;
}
static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
if (tmq == NULL || pRspWrapper == NULL) {
return NULL;
}
SMqRspObj* pRspObj = NULL;
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) {
@ -2425,6 +2402,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
if(pVg == NULL) {
tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId,
pollRspWrapper->topicName, pollRspWrapper->vgId);
terrno = TSDB_CODE_TMQ_INVALID_VGID;
goto END;
}
pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName);
@ -2487,10 +2465,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){
return pRspObj;
}
static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
if (tmq == NULL) {
return NULL;
}
static void* tmqHandleAllRsp(tmq_t* tmq) {
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
void* returnVal = NULL;
@ -2505,15 +2480,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
}
}
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType);
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
if (pRspWrapper->code != 0) {
processMqRspError(tmq, pRspWrapper);
terrno = processMqRspError(tmq, pRspWrapper);
}else{
returnVal = processMqRsp(tmq, pRspWrapper);
}
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
if(returnVal != NULL){
if(returnVal != NULL || terrno != 0){
break;
}
}
@ -2522,49 +2497,47 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) {
}
TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) {
if (tmq == NULL) return NULL;
int32_t lino = 0;
terrno = TSDB_CODE_SUCCESS;
TSDB_CHECK_NULL(tmq, terrno, lino, END, TSDB_CODE_INVALID_PARA);
void* rspObj = NULL;
int64_t startTime = taosGetTimestampMs();
tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime,
timeout);
// in no topic status, delayed task also need to be processed
if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) {
tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId);
taosMsleep(500); // sleep for a while
return NULL;
}
tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout);
TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, terrno, lino, END, TSDB_CODE_TMQ_INVALID_STATUS);
(void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1);
while (1) {
tmqHandleAllDelayedTask(tmq);
terrno = tmqHandleAllDelayedTask(tmq);
TSDB_CHECK_CODE(terrno, lino, END);
if (tmqPollImpl(tmq, timeout) < 0) {
tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId);
}
terrno = tmqPollImpl(tmq, timeout);
TSDB_CHECK_CODE(terrno, lino, END);
rspObj = tmqHandleAllRsp(tmq, timeout);
rspObj = tmqHandleAllRsp(tmq);
if (rspObj) {
tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj);
return (TAOS_RES*)rspObj;
}
TSDB_CHECK_CODE(terrno, lino, END);
if (timeout >= 0) {
int64_t currentTime = taosGetTimestampMs();
int64_t elapsedTime = currentTime - startTime;
if (elapsedTime > timeout || elapsedTime < 0) {
tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64,
tmq->consumerId, tmq->epoch, startTime, currentTime);
return NULL;
}
TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, terrno, lino, END, TSDB_CODE_TIMEOUT_ERROR);
(void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime));
} else {
(void)tsem2_timewait(&tmq->rspSem, 1000);
}
}
END:
if (tmq != NULL) {
tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno));
}
return NULL;
}
static void displayConsumeStatistics(tmq_t* pTmq) {

View File

@ -112,13 +112,12 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle);
void tqDestroyTqHandle(void* data);
// tqRead
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset);
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, int64_t timeout);
int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest);
int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId);
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded);
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
@ -178,6 +177,7 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo
#define TQ_SUBSCRIBE_NAME "subscribe"
#define TQ_OFFSET_NAME "offset-ver0"
#define TQ_POLL_MAX_TIME 1000
#ifdef __cplusplus
}

View File

@ -15,16 +15,14 @@
#include "tq.h"
int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
if (pBlock == NULL || pRsp == NULL) {
return TSDB_CODE_INVALID_PARA;
}
static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) {
int32_t code = 0;
int32_t lino = 0;
size_t dataEncodeBufSize = blockGetEncodeSize(pBlock);
int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize;
void* buf = taosMemoryCalloc(1, dataStrLen);
if (buf == NULL) {
return terrno;
}
TSDB_CHECK_NULL(buf, code, lino, END, terrno);
SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf;
pRetrieve->version = 1;
@ -33,27 +31,22 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows);
int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols);
if(actualLen < 0){
taosMemoryFree(buf);
return terrno;
}
actualLen += sizeof(SRetrieveTableRspForTmq);
if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){
taosMemoryFree(buf);
return terrno;
}
if (taosArrayPush(pRsp->blockData, &buf) == NULL) {
taosMemoryFree(buf);
return terrno;
}
TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno);
return TSDB_CODE_SUCCESS;
actualLen += sizeof(SRetrieveTableRspForTmq);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
buf = NULL;
END:
if (code != 0){
tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code));
}
taosMemoryFree(buf);
return code;
}
static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) {
if (pRsp == NULL || pTq == NULL) {
return TSDB_CODE_INVALID_PARA;
}
SMetaReader mr = {0};
metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK);
@ -112,7 +105,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
TSDB_CHECK_CODE(code, line, END);
qStreamSetSourceExcluded(task, pRequest->sourceExcluded);
uint64_t st = taosGetTimestampMs();
int64_t st = taosGetTimestampMs();
while (1) {
SSDataBlock* pDataBlock = NULL;
code = getDataBlock(task, pHandle, vgId, &pDataBlock);
@ -172,7 +165,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal*
pRsp->blockNum++;
totalRows += pDataBlock->info.rows;
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
break;
}
}
@ -189,68 +182,54 @@ END:
return code;
}
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) {
if (pTq == NULL || pHandle == NULL || pRsp == NULL || pBatchMetaRsp == NULL || pOffset == NULL) {
return TSDB_CODE_INVALID_PARA;
}
int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, int64_t timeout) {
int32_t code = 0;
int32_t lino = 0;
const STqExecHandle* pExec = &pHandle->execHandle;
qTaskInfo_t task = pExec->task;
int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
if (code != 0) {
return code;
}
code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType);
TSDB_CHECK_CODE(code, lino, END);
int32_t rowCnt = 0;
int64_t st = taosGetTimestampMs();
while (1) {
SSDataBlock* pDataBlock = NULL;
uint64_t ts = 0;
tqDebug("tmqsnap task start to execute");
code = qExecTask(task, &pDataBlock, &ts);
if (code != 0) {
tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code));
return code;
}
TSDB_CHECK_CODE(code, lino, END);
tqDebug("tmqsnap task execute end, get %p", pDataBlock);
if (pDataBlock != NULL && pDataBlock->info.rows > 0) {
if (pRsp->withTbName) {
char* tbName = taosStrdup(qExtractTbnameFromTask(task));
if (tbName == NULL) {
tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId);
return terrno;
}
if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){
tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId);
continue;
}
TSDB_CHECK_NULL(tbName, code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno);
tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName);
}
if (pRsp->withSchema) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue;
}
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
}
if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision) != 0) {
tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId);
continue;
}
code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock),
pTq->pVnode->config.tsdbCfg.precision);
TSDB_CHECK_CODE(code, lino, END);
pRsp->blockNum++;
rowCnt += pDataBlock->info.rows;
if (rowCnt <= tmqRowSize) continue;
if (rowCnt <= tmqRowSize && (taosGetTimestampMs() - st <= TMIN(TQ_POLL_MAX_TIME, timeout))) {
continue;
}
}
// get meta
SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task);
if (taosArrayGetSize(tmp->batchMetaReq) > 0) {
code = qStreamExtractOffset(task, &tmp->rspOffset);
if (code) {
return code;
}
TSDB_CHECK_CODE(code, lino, END);
*pBatchMetaRsp = *tmp;
tqDebug("tmqsnap task get meta");
@ -259,9 +238,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
if (pDataBlock == NULL) {
code = qStreamExtractOffset(task, pOffset);
if (code) {
break;
}
TSDB_CHECK_CODE(code, lino, END);
if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
continue;
@ -280,6 +257,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
}
}
END:
if(code != 0){
tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code));
}
return code;
}

View File

@ -234,7 +234,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset));
if (offset->type != TMQ_OFFSET__LOG) {
TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset));
TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest->timeout));
if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) {
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
@ -378,7 +378,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto END;
}
totalMetaRows++;
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) {
if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer);
code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId);
goto END;
@ -406,7 +406,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
goto END;
}
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) {
if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) {
tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1);
code = tqSendDataRsp(
pHandle, pMsg, pRequest, &taosxRsp,

View File

@ -856,6 +856,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only on
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance")
TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first")
// stream
TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist")

View File

@ -181,6 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
printResult(tmqmessage);
taos_free_result(tmqmessage);
} else {
ASSERT(taos_errno(NULL) == TSDB_CODE_TIMEOUT_ERROR);
break;
}
}