From 226701ad1473abdc4258119fd6b71ebdcc366181 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Tue, 31 Dec 2024 16:48:23 +0800 Subject: [PATCH] fix:[TD-32471]set error code to terrno if tmq_consumer_poll return NULL --- include/common/tcommon.h | 4 + include/util/taoserror.h | 1 + source/client/src/clientRawBlockWrite.c | 1 + source/client/src/clientTmq.c | 169 ++++++++++-------------- source/dnode/vnode/src/inc/tq.h | 8 +- source/dnode/vnode/src/tq/tqScan.c | 109 +++++++-------- source/dnode/vnode/src/tq/tqUtil.c | 6 +- source/util/src/terror.c | 1 + utils/test/c/tmq_td32526.c | 1 + 9 files changed, 131 insertions(+), 169 deletions(-) diff --git a/include/common/tcommon.h b/include/common/tcommon.h index 61cd482c70..0450766535 100644 --- a/include/common/tcommon.h +++ b/include/common/tcommon.h @@ -123,6 +123,10 @@ enum { TMQ_MSG_TYPE__POLL_BATCH_META_RSP, }; +static char* tmqMsgTypeStr[] = { + "data", "meta", "ask ep", "meta data", "wal info", "batch meta" +}; + enum { STREAM_INPUT__DATA_SUBMIT = 1, STREAM_INPUT__DATA_BLOCK, diff --git a/include/util/taoserror.h b/include/util/taoserror.h index e317fdd65a..3e2bb0080d 100644 --- a/include/util/taoserror.h +++ b/include/util/taoserror.h @@ -1013,6 +1013,7 @@ int32_t taosGetErrSize(); #define TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT TAOS_DEF_ERROR_CODE(0, 0x4014) #define TSDB_CODE_TMQ_NO_TABLE_QUALIFIED TAOS_DEF_ERROR_CODE(0, 0x4015) #define TSDB_CODE_TMQ_NO_NEED_REBALANCE TAOS_DEF_ERROR_CODE(0, 0x4016) +#define TSDB_CODE_TMQ_INVALID_STATUS TAOS_DEF_ERROR_CODE(0, 0x4017) // stream #define TSDB_CODE_STREAM_TASK_NOT_EXIST TAOS_DEF_ERROR_CODE(0, 0x4100) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 613645c4cd..178ab6b253 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2572,6 +2572,7 @@ int32_t tmq_write_raw(TAOS* taos, tmq_raw_data raw) { SET_ERROR_MSG("taos:%p or data:%p is NULL or raw_len <= 0", taos, raw.raw); return TSDB_CODE_INVALID_PARA; } + SET_ERROR_MSG(""); // clear global error message return writeRawImpl(taos, raw.raw, raw.raw_len, raw.raw_type); } diff --git a/source/client/src/clientTmq.c b/source/client/src/clientTmq.c index 0cbdfc13e0..7b1365fe82 100644 --- a/source/client/src/clientTmq.c +++ b/source/client/src/clientTmq.c @@ -1388,49 +1388,32 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { if (pTmq == NULL) { return TSDB_CODE_INVALID_PARA; } + int32_t code = 0; + int32_t lino = 0; SMqAskEpReq req = {0}; req.consumerId = pTmq->consumerId; req.epoch = updateEpSet ? -1 : pTmq->epoch; tstrncpy(req.cgroup, pTmq->groupId, TSDB_CGROUP_LEN); - int code = 0; SMqAskEpCbParam* pParam = NULL; void* pReq = NULL; int32_t tlen = tSerializeSMqAskEpReq(NULL, 0, &req); - if (tlen < 0) { - tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq failed", pTmq->consumerId); - return TSDB_CODE_INVALID_PARA; - } - + TSDB_CHECK_CONDITION(tlen >= 0, code, lino, END, TSDB_CODE_INVALID_PARA); pReq = taosMemoryCalloc(1, tlen); - if (pReq == NULL) { - tqErrorC("consumer:0x%" PRIx64 ", failed to malloc askEpReq msg, size:%d", pTmq->consumerId, tlen); - return terrno; - } + TSDB_CHECK_NULL(pReq, code, lino, END, terrno); - if (tSerializeSMqAskEpReq(pReq, tlen, &req) < 0) { - tqErrorC("consumer:0x%" PRIx64 ", tSerializeSMqAskEpReq %d failed", pTmq->consumerId, tlen); - taosMemoryFree(pReq); - return TSDB_CODE_INVALID_PARA; - } + code = tSerializeSMqAskEpReq(pReq, tlen, &req); + TSDB_CHECK_CONDITION(code >= 0, code, lino, END, TSDB_CODE_INVALID_PARA); pParam = taosMemoryCalloc(1, sizeof(SMqAskEpCbParam)); - if (pParam == NULL) { - tqErrorC("consumer:0x%" PRIx64 ", failed to malloc subscribe param", pTmq->consumerId); - taosMemoryFree(pReq); - return terrno; - } + TSDB_CHECK_NULL(pParam, code, lino, END, terrno); pParam->refId = pTmq->refId; pParam->sync = sync; pParam->pParam = param; SMsgSendInfo* sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) { - taosMemoryFree(pReq); - taosMemoryFree(pParam); - return terrno; - } + TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno); sendInfo->msgInfo = (SDataBuf){.pData = pReq, .len = tlen, .handle = NULL}; sendInfo->requestId = generateRequestId(); @@ -1440,28 +1423,36 @@ static int32_t askEp(tmq_t* pTmq, void* param, bool sync, bool updateEpSet) { sendInfo->fp = askEpCb; sendInfo->msgType = TDMT_MND_TMQ_ASK_EP; + pReq = NULL; + pParam = NULL; + SEpSet epSet = getEpSet_s(&pTmq->pTscObj->pAppInfo->mgmtEp); tqDebugC("consumer:0x%" PRIx64 " ask ep from mnode,QID:0x%" PRIx64, pTmq->consumerId, sendInfo->requestId); - return asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); + code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &epSet, NULL, sendInfo); + +END: + if (code != 0) { + tqErrorC("%s failed at %d, msg:%s", __func__, lino, tstrerror(code)); + } + taosMemoryFree(pReq); + taosMemoryFree(pParam); + return code; } -void tmqHandleAllDelayedTask(tmq_t* pTmq) { - if (pTmq == NULL) { - return; - } +static int32_t tmqHandleAllDelayedTask(tmq_t* pTmq) { STaosQall* qall = NULL; int32_t code = 0; code = taosAllocateQall(&qall); if (code) { tqErrorC("consumer:0x%" PRIx64 ", failed to allocate qall, code:%s", pTmq->consumerId, tstrerror(code)); - return; + return code; } int32_t numOfItems = taosReadAllQitems(pTmq->delayedTask, qall); if (numOfItems == 0) { taosFreeQall(qall); - return; + return 0; } tqDebugC("consumer:0x%" PRIx64 " handle delayed %d tasks before poll data", pTmq->consumerId, numOfItems); @@ -1472,7 +1463,6 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { code = askEp(pTmq, NULL, false, false); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", pTmq->consumerId, tstrerror(code)); - continue; } tqDebugC("consumer:0x%" PRIx64 " retrieve ep from mnode in 1s", pTmq->consumerId); bool ret = taosTmrReset(tmqAssignAskEpTask, DEFAULT_ASKEP_INTERVAL, (void*)(pTmq->refId), tmqMgmt.timer, @@ -1494,6 +1484,7 @@ void tmqHandleAllDelayedTask(tmq_t* pTmq) { } taosFreeQall(qall); + return 0; } void tmqClearUnhandleMsg(tmq_t* tmq) { @@ -2095,7 +2086,7 @@ void tmqBuildConsumeReqImpl(SMqPollReq* pReq, tmq_t* tmq, int64_t timeout, SMqCl (void)snprintf(pReq->subKey, TSDB_SUBSCRIBE_KEY_LEN, "%s%s%s", tmq->groupId, TMQ_SEPARATOR, pTopic->topicName); pReq->withTbName = tmq->withTbName; pReq->consumerId = tmq->consumerId; - pReq->timeout = timeout; + pReq->timeout = timeout < 0 ? INT32_MAX : timeout; pReq->epoch = tmq->epoch; pReq->reqOffset = pVg->offsetInfo.endOffset; pReq->head.vgId = pVg->vgId; @@ -2199,39 +2190,24 @@ static void tmqBuildRspFromWrapperInner(SMqPollRspWrapper* pWrapper, SMqClientVg } static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* pVg, int64_t timeout) { - if (pTmq == NULL || pTopic == NULL || pVg == NULL) { - return TSDB_CODE_INVALID_MSG; - } SMqPollReq req = {0}; char* msg = NULL; SMqPollCbParam* pParam = NULL; SMsgSendInfo* sendInfo = NULL; int code = 0; + int lino = 0; tmqBuildConsumeReqImpl(&req, pTmq, timeout, pTopic, pVg); int32_t msgSize = tSerializeSMqPollReq(NULL, 0, &req); - if (msgSize < 0) { - code = TSDB_CODE_INVALID_MSG; - return code; - } + TSDB_CHECK_CONDITION(msgSize >= 0, code, lino, END, TSDB_CODE_INVALID_MSG); msg = taosMemoryCalloc(1, msgSize); - if (NULL == msg) { - return terrno; - } + TSDB_CHECK_NULL(msg, code, lino, END, terrno); - if (tSerializeSMqPollReq(msg, msgSize, &req) < 0) { - code = TSDB_CODE_INVALID_MSG; - taosMemoryFreeClear(msg); - return code; - } + TSDB_CHECK_CONDITION(tSerializeSMqPollReq(msg, msgSize, &req) >= 0, code, lino, END, TSDB_CODE_INVALID_MSG); pParam = taosMemoryMalloc(sizeof(SMqPollCbParam)); - if (pParam == NULL) { - code = terrno; - taosMemoryFreeClear(msg); - return code; - } + TSDB_CHECK_NULL(pParam, code, lino, END, terrno); pParam->refId = pTmq->refId; tstrncpy(pParam->topicName, pTopic->topicName, TSDB_TOPIC_FNAME_LEN); @@ -2239,11 +2215,7 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p pParam->requestId = req.reqId; sendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); - if (sendInfo == NULL) { - taosMemoryFreeClear(pParam); - taosMemoryFreeClear(msg); - return terrno; - } + TSDB_CHECK_NULL(sendInfo, code, lino, END, terrno); sendInfo->msgInfo = (SDataBuf){.pData = msg, .len = msgSize, .handle = NULL}; sendInfo->requestId = req.reqId; @@ -2253,23 +2225,29 @@ static int32_t doTmqPollImpl(tmq_t* pTmq, SMqClientTopic* pTopic, SMqClientVg* p sendInfo->fp = tmqPollCb; sendInfo->msgType = TDMT_VND_TMQ_CONSUME; + msg = NULL; + pParam = NULL; + char offsetFormatBuf[TSDB_OFFSET_LEN] = {0}; tFormatOffset(offsetFormatBuf, tListLen(offsetFormatBuf), &pVg->offsetInfo.endOffset); code = asyncSendMsgToServer(pTmq->pTscObj->pAppInfo->pTransporter, &pVg->epSet, NULL, sendInfo); tqDebugC("consumer:0x%" PRIx64 " send poll to %s vgId:%d, code:%d, epoch %d, req:%s,QID:0x%" PRIx64, pTmq->consumerId, pTopic->topicName, pVg->vgId, code, pTmq->epoch, offsetFormatBuf, req.reqId); - if (code != 0) { - return code; - } + TSDB_CHECK_CODE(code, lino, END); pVg->pollCnt++; pVg->seekUpdated = false; // reset this flag. pTmq->pollCnt++; - return 0; +END: + if (code != 0){ + tqErrorC("%s failed at %d msg:%s", __func__, lino, tstrerror(code)); + } + taosMemoryFreeClear(pParam); + taosMemoryFreeClear(msg); + return code; } -// broadcast the poll request to all related vnodes static int32_t tmqPollImpl(tmq_t* tmq, int64_t timeout) { if (tmq == NULL) { return TSDB_CODE_INVALID_MSG; @@ -2377,22 +2355,23 @@ static SMqRspObj* buildRsp(SMqPollRspWrapper* pollRspWrapper){ return pRspObj; } -static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ - if (tmq == NULL || pRspWrapper == NULL) { - return; - } +static int32_t processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ SMqPollRspWrapper* pollRspWrapper = &pRspWrapper->pollRsp; if (pRspWrapper->code == TSDB_CODE_VND_INVALID_VGROUP_ID) { // for vnode transform int32_t code = askEp(tmq, NULL, false, true); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + return code; } } else if (pRspWrapper->code == TSDB_CODE_TMQ_CONSUMER_MISMATCH) { int32_t code = askEp(tmq, NULL, false, false); if (code != 0) { tqErrorC("consumer:0x%" PRIx64 " failed to ask ep, code:%s", tmq->consumerId, tstrerror(code)); + return code; } + } else{ + return pRspWrapper->code; } tqInfoC("consumer:0x%" PRIx64 " msg from vgId:%d discarded, since %s", tmq->consumerId, pollRspWrapper->vgId, tstrerror(pRspWrapper->code)); @@ -2404,11 +2383,9 @@ static void processMqRspError(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ atomic_store_32(&pVg->vgStatus, TMQ_VG_STATUS__IDLE); } taosWUnLockLatch(&tmq->lock); + return TSDB_CODE_SUCCESS; } static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ - if (tmq == NULL || pRspWrapper == NULL) { - return NULL; - } SMqRspObj* pRspObj = NULL; if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__EP_RSP) { @@ -2425,6 +2402,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ if(pVg == NULL) { tqErrorC("consumer:0x%" PRIx64 " get vg or topic error, topic:%s vgId:%d", tmq->consumerId, pollRspWrapper->topicName, pollRspWrapper->vgId); + terrno = TSDB_CODE_TMQ_INVALID_VGID; goto END; } pollRspWrapper->topicHandle = getTopicInfo(tmq, pollRspWrapper->topicName); @@ -2487,10 +2465,7 @@ static SMqRspObj* processMqRsp(tmq_t* tmq, SMqRspWrapper* pRspWrapper){ return pRspObj; } -static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { - if (tmq == NULL) { - return NULL; - } +static void* tmqHandleAllRsp(tmq_t* tmq) { tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall)); void* returnVal = NULL; @@ -2505,15 +2480,15 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } } - tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%d", tmq->consumerId, pRspWrapper->tmqRspType); + tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]); if (pRspWrapper->code != 0) { - processMqRspError(tmq, pRspWrapper); + terrno = processMqRspError(tmq, pRspWrapper); }else{ returnVal = processMqRsp(tmq, pRspWrapper); } tmqFreeRspWrapper(pRspWrapper); taosFreeQitem(pRspWrapper); - if(returnVal != NULL){ + if(returnVal != NULL || terrno != 0){ break; } } @@ -2522,49 +2497,47 @@ static void* tmqHandleAllRsp(tmq_t* tmq, int64_t timeout) { } TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t timeout) { - if (tmq == NULL) return NULL; + int32_t lino = 0; + terrno = TSDB_CODE_SUCCESS; + TSDB_CHECK_NULL(tmq, terrno, lino, END, TSDB_CODE_INVALID_PARA); void* rspObj = NULL; int64_t startTime = taosGetTimestampMs(); - tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, - timeout); - - // in no topic status, delayed task also need to be processed - if (atomic_load_8(&tmq->status) == TMQ_CONSUMER_STATUS__INIT) { - tqInfoC("consumer:0x%" PRIx64 " poll return since consumer is init", tmq->consumerId); - taosMsleep(500); // sleep for a while - return NULL; - } + tqDebugC("consumer:0x%" PRIx64 " start to poll at %" PRId64 ", timeout:%" PRId64, tmq->consumerId, startTime, timeout); + TSDB_CHECK_CONDITION(atomic_load_8(&tmq->status) != TMQ_CONSUMER_STATUS__INIT, terrno, lino, END, TSDB_CODE_TMQ_INVALID_STATUS); (void)atomic_val_compare_exchange_8(&tmq->pollFlag, 0, 1); while (1) { - tmqHandleAllDelayedTask(tmq); + terrno = tmqHandleAllDelayedTask(tmq); + TSDB_CHECK_CODE(terrno, lino, END); - if (tmqPollImpl(tmq, timeout) < 0) { - tqErrorC("consumer:0x%" PRIx64 " return due to poll error", tmq->consumerId); - } + terrno = tmqPollImpl(tmq, timeout); + TSDB_CHECK_CODE(terrno, lino, END); - rspObj = tmqHandleAllRsp(tmq, timeout); + rspObj = tmqHandleAllRsp(tmq); if (rspObj) { tqDebugC("consumer:0x%" PRIx64 " return rsp %p", tmq->consumerId, rspObj); return (TAOS_RES*)rspObj; } + TSDB_CHECK_CODE(terrno, lino, END); if (timeout >= 0) { int64_t currentTime = taosGetTimestampMs(); int64_t elapsedTime = currentTime - startTime; - if (elapsedTime > timeout || elapsedTime < 0) { - tqDebugC("consumer:0x%" PRIx64 " (epoch %d) timeout, no rsp, start time %" PRId64 ", current time %" PRId64, - tmq->consumerId, tmq->epoch, startTime, currentTime); - return NULL; - } + TSDB_CHECK_CONDITION(elapsedTime <= timeout && elapsedTime >= 0, terrno, lino, END, TSDB_CODE_TIMEOUT_ERROR); (void)tsem2_timewait(&tmq->rspSem, (timeout - elapsedTime)); } else { (void)tsem2_timewait(&tmq->rspSem, 1000); } } + +END: + if (tmq != NULL) { + tqErrorC("consumer:0x%" PRIx64 " poll error at line:%d, msg:%s", tmq->consumerId, lino, tstrerror(terrno)); + } + return NULL; } static void displayConsumeStatistics(tmq_t* pTmq) { diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index 28a0d11757..4340177035 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -112,13 +112,12 @@ int32_t tDecodeSTqHandle(SDecoder* pDecoder, STqHandle* pHandle); void tqDestroyTqHandle(void* data); // tqRead -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset); +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* offset, int64_t timeout); int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest); int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId); // tqExec int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded); -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision); int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type, int32_t vgId); void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId); @@ -176,8 +175,9 @@ int32_t tqExtractDropCtbDataBlock(const void* data, int32_t len, int64_t ver, vo } \ } while (0) -#define TQ_SUBSCRIBE_NAME "subscribe" -#define TQ_OFFSET_NAME "offset-ver0" +#define TQ_SUBSCRIBE_NAME "subscribe" +#define TQ_OFFSET_NAME "offset-ver0" +#define TQ_POLL_MAX_TIME 1000 #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 3419cd0020..a65e33ec97 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -15,16 +15,14 @@ #include "tq.h" -int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { - if (pBlock == NULL || pRsp == NULL) { - return TSDB_CODE_INVALID_PARA; - } +static int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { + int32_t code = 0; + int32_t lino = 0; + size_t dataEncodeBufSize = blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize; void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) { - return terrno; - } + TSDB_CHECK_NULL(buf, code, lino, END, terrno); SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; pRetrieve->version = 1; @@ -33,27 +31,22 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols); - if(actualLen < 0){ - taosMemoryFree(buf); - return terrno; - } - actualLen += sizeof(SRetrieveTableRspForTmq); - if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){ - taosMemoryFree(buf); - return terrno; - } - if (taosArrayPush(pRsp->blockData, &buf) == NULL) { - taosMemoryFree(buf); - return terrno; - } + TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno); - return TSDB_CODE_SUCCESS; + actualLen += sizeof(SRetrieveTableRspForTmq); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno); + + buf = NULL; +END: + if (code != 0){ + tqError("%s failed at line %d with msg:%s", __func__, lino, tstrerror(code)); + } + taosMemoryFree(buf); + return code; } static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) { - if (pRsp == NULL || pTq == NULL) { - return TSDB_CODE_INVALID_PARA; - } SMetaReader mr = {0}; metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK); @@ -112,7 +105,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* TSDB_CHECK_CODE(code, line, END); qStreamSetSourceExcluded(task, pRequest->sourceExcluded); - uint64_t st = taosGetTimestampMs(); + int64_t st = taosGetTimestampMs(); while (1) { SSDataBlock* pDataBlock = NULL; code = getDataBlock(task, pHandle, vgId, &pDataBlock); @@ -172,7 +165,7 @@ int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pRsp->blockNum++; totalRows += pDataBlock->info.rows; - if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { + if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { break; } } @@ -189,68 +182,54 @@ END: return code; } -int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) { - if (pTq == NULL || pHandle == NULL || pRsp == NULL || pBatchMetaRsp == NULL || pOffset == NULL) { - return TSDB_CODE_INVALID_PARA; - } +int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset, int64_t timeout) { + int32_t code = 0; + int32_t lino = 0; + const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; - int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - if (code != 0) { - return code; - } + code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); + TSDB_CHECK_CODE(code, lino, END); int32_t rowCnt = 0; + int64_t st = taosGetTimestampMs(); while (1) { SSDataBlock* pDataBlock = NULL; uint64_t ts = 0; tqDebug("tmqsnap task start to execute"); code = qExecTask(task, &pDataBlock, &ts); - if (code != 0) { - tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code)); - return code; - } - + TSDB_CHECK_CODE(code, lino, END); tqDebug("tmqsnap task execute end, get %p", pDataBlock); if (pDataBlock != NULL && pDataBlock->info.rows > 0) { if (pRsp->withTbName) { char* tbName = taosStrdup(qExtractTbnameFromTask(task)); - if (tbName == NULL) { - tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId); - return terrno; - } - if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){ - tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); - continue; - } + TSDB_CHECK_NULL(tbName, code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno); + tqDebug("vgId:%d, add tbname:%s to rsp msg", pTq->pVnode->config.vgId, tbName); } if (pRsp->withSchema) { SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); - if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ - tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); - continue; - } + TSDB_CHECK_NULL(pSW, code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno); } - if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), - pTq->pVnode->config.tsdbCfg.precision) != 0) { - tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId); - continue; - } + code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), + pTq->pVnode->config.tsdbCfg.precision); + TSDB_CHECK_CODE(code, lino, END); + pRsp->blockNum++; rowCnt += pDataBlock->info.rows; - if (rowCnt <= tmqRowSize) continue; - + if (rowCnt <= tmqRowSize && (taosGetTimestampMs() - st <= TMIN(TQ_POLL_MAX_TIME, timeout))) { + continue; + } } // get meta SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task); if (taosArrayGetSize(tmp->batchMetaReq) > 0) { code = qStreamExtractOffset(task, &tmp->rspOffset); - if (code) { - return code; - } + TSDB_CHECK_CODE(code, lino, END); *pBatchMetaRsp = *tmp; tqDebug("tmqsnap task get meta"); @@ -259,9 +238,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat if (pDataBlock == NULL) { code = qStreamExtractOffset(task, pOffset); - if (code) { - break; - } + TSDB_CHECK_CODE(code, lino, END); if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { continue; @@ -280,6 +257,10 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat } } +END: + if(code != 0){ + tqError("%s failed at %d, vgId:%d, task exec error since %s", __FUNCTION__ , lino, pTq->pVnode->config.vgId, tstrerror(code)); + } return code; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f6a8563c70..c99acfd4d2 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -234,7 +234,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, TQ_ERR_GO_TO_END(tqInitTaosxRsp(&taosxRsp, *offset)); if (offset->type != TMQ_OFFSET__LOG) { - TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset)); + TQ_ERR_GO_TO_END(tqScanTaosx(pTq, pHandle, &taosxRsp, &btMetaRsp, offset, pRequest->timeout)); if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) { code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); @@ -378,7 +378,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto END; } totalMetaRows++; - if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) { + if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); goto END; @@ -406,7 +406,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, goto END; } - if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { + if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout))) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); code = tqSendDataRsp( pHandle, pMsg, pRequest, &taosxRsp, diff --git a/source/util/src/terror.c b/source/util/src/terror.c index b2a8c422f7..db6aa2889b 100644 --- a/source/util/src/terror.c +++ b/source/util/src/terror.c @@ -856,6 +856,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NEED_ONE_VGROUP, "Replay need only on TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_REPLAY_NOT_SUPPORT, "Replay is disabled if subscribe db or stable") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_TABLE_QUALIFIED, "No table qualified for query") TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_NO_NEED_REBALANCE, "No need rebalance") +TAOS_DEFINE_ERROR(TSDB_CODE_TMQ_INVALID_STATUS, "Invalid status, please subscribe topic first") // stream TAOS_DEFINE_ERROR(TSDB_CODE_STREAM_TASK_NOT_EXIST, "Stream task not exist") diff --git a/utils/test/c/tmq_td32526.c b/utils/test/c/tmq_td32526.c index 42d38ec56c..33cb586501 100644 --- a/utils/test/c/tmq_td32526.c +++ b/utils/test/c/tmq_td32526.c @@ -181,6 +181,7 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) { printResult(tmqmessage); taos_free_result(tmqmessage); } else { + ASSERT(taos_errno(NULL) == TSDB_CODE_TIMEOUT_ERROR); break; } }