From b63787aa09622fbab86f7dfd9596a61c3e98bc09 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Mon, 30 Dec 2024 17:23:11 +0800 Subject: [PATCH] fix:[TD-33396]add log for tmq --- source/dnode/vnode/src/tq/tq.c | 2 +- source/dnode/vnode/src/tq/tqRead.c | 107 +++---- source/dnode/vnode/src/tq/tqScan.c | 438 ++++++++++++++--------------- source/dnode/vnode/src/tq/tqUtil.c | 178 +++++------- 4 files changed, 346 insertions(+), 379 deletions(-) diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index 73052c1e5e..3bfc50fcb2 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -207,7 +207,7 @@ int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* (void)tFormatOffset(buf1, TSDB_OFFSET_LEN, &(pRsp->reqOffset)); (void)tFormatOffset(buf2, TSDB_OFFSET_LEN, &(pRsp->rspOffset)); - tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64, + tqDebug("tmq poll vgId:%d consumer:0x%" PRIx64 " (epoch %d) start to send rsp, block num:%d, req:%s, rsp:%s,QID:0x%" PRIx64, vgId, pReq->consumerId, pReq->epoch, pRsp->blockNum, buf1, buf2, pReq->reqId); return tqDoSendDataRsp(&pMsg->info, pRsp, pReq->epoch, pReq->consumerId, type, sver, ever); diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index a2b6194375..61b59e4cd0 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -197,7 +197,9 @@ bool isValValidForTable(STqHandle* pHandle, SWalCont* pHead) { end: tDecoderClear(&dcoder); - return tbSuid == realTbSuid; + bool tmp = tbSuid == realTbSuid; + tqDebug("%s suid:%"PRId64" realSuid:%"PRId64" return:%d", __FUNCTION__, tbSuid, realTbSuid, tmp); + return tmp; } int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t reqId) { @@ -262,6 +264,8 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t END: *fetchOffset = offset; + tqDebug("vgId:%d, end to fetch wal, code:%d , index:%" PRId64 ", last:%" PRId64 " commit:%" PRId64 ", applied:%" PRId64 ", 0x%" PRIx64, + vgId, code, offset, lastVer, committedVer, appliedVer, id); return code; } @@ -273,6 +277,8 @@ bool tqGetTablePrimaryKey(STqReader* pReader) { } void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { + tqDebug("%s:%p uid:%"PRId64, __FUNCTION__ , pReader, uid); + if (pReader == NULL) { return; } @@ -286,6 +292,7 @@ void tqSetTablePrimaryKey(STqReader* pReader, int64_t uid) { } STqReader* tqReaderOpen(SVnode* pVnode) { + tqDebug("%s:%p", __FUNCTION__ , pVnode); if (pVnode == NULL) { return NULL; } @@ -317,6 +324,7 @@ STqReader* tqReaderOpen(SVnode* pVnode) { } void tqReaderClose(STqReader* pReader) { + tqDebug("%s:%p", __FUNCTION__ , pReader); if (pReader == NULL) return; // close wal reader @@ -485,7 +493,7 @@ bool tqNextBlockInWal(STqReader* pReader, const char* id, int sourceExcluded) { } int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, int64_t ver) { -if (pReader == NULL) { + if (pReader == NULL) { return TSDB_CODE_INVALID_PARA; } pReader->msg.msgStr = msgStr; @@ -497,14 +505,13 @@ if (pReader == NULL) { tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen); int32_t code = tDecodeSubmitReq(&decoder, &pReader->submit); + tDecoderClear(&decoder); + if (code != 0) { - tDecoderClear(&decoder); tqError("DecodeSSubmitReq2 error, msgLen:%d, ver:%" PRId64, msgLen, ver); - return code; } - tDecoderClear(&decoder); - return 0; + return code; } SWalReader* tqGetWalReader(STqReader* pReader) { @@ -529,32 +536,25 @@ int64_t tqGetResultBlockTime(STqReader* pReader) { } bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { - if (pReader == NULL || pReader->msg.msgStr == NULL) { - return false; - } + int32_t code = false; + int32_t lino = 0; + int64_t uid = 0; + + TSDB_CHECK_NULL(pReader, code, lino, END, false); + TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false); + TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true); int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < numOfBlocks) { - tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, - (pReader->nextBlk + 1), numOfBlocks, idstr); + tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, (pReader->nextBlk + 1), numOfBlocks, idstr); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if (pSubmitTbData == NULL) { - return false; - } - if (pReader->tbIdHash == NULL) { - return true; - } - + TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false); + uid = pSubmitTbData->uid; void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); - if (ret != NULL) { - tqDebug("block found, ver:%" PRId64 ", uid:%" PRId64 ", %s", pReader->msg.ver, pSubmitTbData->uid, idstr); - return true; - } else { - tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, - taosHashGetSize(pReader->tbIdHash), idstr); - } + TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true); + tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, taosHashGetSize(pReader->tbIdHash), idstr); pReader->nextBlk++; } @@ -562,30 +562,40 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { pReader->nextBlk = 0; pReader->msg.msgStr = NULL; - return false; +END: + tqDebug("%s:%d, uid:%"PRId64",code:%d", __FUNCTION__, lino, uid, code); + return code; } bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) { - if (pReader == NULL || pReader->msg.msgStr == NULL) return false; + int32_t code = false; + int32_t lino = 0; + int64_t uid = 0; + + TSDB_CHECK_NULL(pReader, code, lino, END, false); + TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false); + TSDB_CHECK_NULL(filterOutUids, code, lino, END, true); int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); while (pReader->nextBlk < blockSz) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); - if (pSubmitTbData == NULL) return false; - if (filterOutUids == NULL) return true; - + TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false); + uid = pSubmitTbData->uid; void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t)); - if (ret == NULL) { - return true; - } + TSDB_CHECK_NULL(ret, code, lino, END, true); + pReader->nextBlk++; + tqDebug("discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); } tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->nextBlk = 0; pReader->msg.msgStr = NULL; + tqDebug("all data blocks are filtered out"); - return false; +END: + tqDebug("%s:%d, uid:%"PRId64",code:%d", __FUNCTION__, lino, uid, code); + return code; } int32_t tqMaskBlock(SSchemaWrapper* pDst, SSDataBlock* pBlock, const SSchemaWrapper* pSrc, char* mask) { @@ -709,7 +719,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* if (pReader == NULL || pRes == NULL) { return TSDB_CODE_INVALID_PARA; } - tqTrace("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); + tqDebug("tq reader retrieve data block %p, index:%d", pReader->msg.msgStr, pReader->nextBlk); int32_t code = 0; int32_t line = 0; STSchema* pTSchema = NULL; @@ -876,10 +886,6 @@ END: static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas, SSchemaWrapper* pSchemaWrapper, char* assigned, int32_t numOfRows, int32_t curRow, int32_t* lastRow) { - if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL || pSchemaWrapper == NULL || - assigned == NULL || lastRow == NULL) { - return TSDB_CODE_INVALID_PARA; - } int32_t code = 0; SSchemaWrapper* pSW = NULL; SSDataBlock* block = NULL; @@ -909,15 +915,15 @@ static int32_t processBuildNew(STqReader* pReader, SSubmitTbData* pSubmitTbData, taosMemoryFreeClear(block); END: + if (code != 0) { + tqError("processBuildNew failed, code:%d", code); + } tDeleteSchemaWrapper(pSW); blockDataFreeRes(block); taosMemoryFree(block); return code; } static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) { - if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) { - return TSDB_CODE_INVALID_PARA; - } int32_t code = 0; int32_t curRow = 0; int32_t lastRow = 0; @@ -931,6 +937,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData TQ_NULL_GO_TO_END(pCol); int32_t numOfRows = pCol->nVal; int32_t numOfCols = taosArrayGetSize(pCols); + tqDebug("vgId:%d, tqProcessColData, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows); for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; @@ -972,14 +979,14 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData pLastBlock->info.rows = curRow - lastRow; END: + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code); + } taosMemoryFree(assigned); return code; } int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArray* blocks, SArray* schemas) { - if (pReader == NULL || pSubmitTbData == NULL || blocks == NULL || schemas == NULL) { - return TSDB_CODE_INVALID_PARA; - } int32_t code = 0; STSchema* pTSchema = NULL; @@ -992,6 +999,8 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra SArray* pRows = pSubmitTbData->aRowP; int32_t numOfRows = taosArrayGetSize(pRows); pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version); + TQ_NULL_GO_TO_END(pTSchema); + tqDebug("vgId:%d, tqProcessRowData, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows); for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; @@ -1031,16 +1040,16 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra pLastBlock->info.rows = curRow - lastRow; END: + if (code != TSDB_CODE_SUCCESS) { + tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code); + } taosMemoryFreeClear(pTSchema); taosMemoryFree(assigned); return code; } int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) { - if (pReader == NULL || blocks == NULL || schemas == NULL) { - return TSDB_CODE_INVALID_PARA; - } - tqTrace("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk); + tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pSubmitTbData == NULL) { return terrno; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 3419cd0020..ec200ba144 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -16,15 +16,16 @@ #include "tq.h" int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t numOfCols, int8_t precision) { - if (pBlock == NULL || pRsp == NULL) { - return TSDB_CODE_INVALID_PARA; - } + int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; + void* buf = NULL; + TSDB_CHECK_NULL(pBlock, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); + size_t dataEncodeBufSize = blockGetEncodeSize(pBlock); int32_t dataStrLen = sizeof(SRetrieveTableRspForTmq) + dataEncodeBufSize; - void* buf = taosMemoryCalloc(1, dataStrLen); - if (buf == NULL) { - return terrno; - } + buf = taosMemoryCalloc(1, dataStrLen); + TSDB_CHECK_NULL(buf, code, lino, END, terrno); SRetrieveTableRspForTmq* pRetrieve = (SRetrieveTableRspForTmq*)buf; pRetrieve->version = 1; @@ -33,49 +34,49 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t pRetrieve->numOfRows = htobe64((int64_t)pBlock->info.rows); int32_t actualLen = blockEncode(pBlock, pRetrieve->data, dataEncodeBufSize, numOfCols); - if(actualLen < 0){ - taosMemoryFree(buf); - return terrno; - } + TSDB_CHECK_CONDITION(actualLen >= 0, code, lino, END, terrno); actualLen += sizeof(SRetrieveTableRspForTmq); - if (taosArrayPush(pRsp->blockDataLen, &actualLen) == NULL){ - taosMemoryFree(buf); - return terrno; - } - if (taosArrayPush(pRsp->blockData, &buf) == NULL) { - taosMemoryFree(buf); - return terrno; - } + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno); - return TSDB_CODE_SUCCESS; + tqDebug("add block data to response success:%p, blockDataLen:%d, blockData:%p", pRsp->blockDataLen, actualLen, pRsp->blockData); +END: + if (code != TSDB_CODE_SUCCESS) { + taosMemoryFree(buf); + tqError("%s failed at %d, failed to add block data to response:%s", __FUNCTION__, lino, tstrerror(code)); + } + return code; } static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, int32_t n) { - if (pRsp == NULL || pTq == NULL) { - return TSDB_CODE_INVALID_PARA; - } + int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; SMetaReader mr = {0}; + + TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); + metaReaderDoInit(&mr, pTq->pVnode->pMeta, META_READER_LOCK); - int32_t code = metaReaderGetTableEntryByUidCache(&mr, uid); - if (code < 0) { - metaReaderClear(&mr); - return code; - } + code = metaReaderGetTableEntryByUidCache(&mr, uid); + TSDB_CHECK_CODE(code, lino, END); for (int32_t i = 0; i < n; i++) { char* tbName = taosStrdup(mr.me.name); - if (tbName == NULL) { - metaReaderClear(&mr); - return terrno; - } + TSDB_CHECK_NULL(tbName, code, lino, END, terrno); if(taosArrayPush(pRsp->blockTbName, &tbName) == NULL){ - tqError("failed to push tbName to blockTbName:%s", tbName); + tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid); continue; } + tqDebug("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid); + } + +END: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at %d, failed to add tbName to response:%s, uid:%"PRId64, __FUNCTION__, lino, tstrerror(code), uid); } metaReaderClear(&mr); - return 0; + return code; } int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, SSDataBlock** res) { @@ -96,109 +97,130 @@ int32_t getDataBlock(qTaskInfo_t task, const STqHandle* pHandle, int32_t vgId, S return 0; } -int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { - if (pTq == NULL || pHandle == NULL || pRsp == NULL || pOffset == NULL || pRequest == NULL){ - return TSDB_CODE_INVALID_PARA; - } - int32_t vgId = TD_VID(pTq->pVnode); +static int32_t tqProcessReplayRsp(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, const SMqPollReq* pRequest, SSDataBlock* pDataBlock, qTaskInfo_t task){ int32_t code = 0; - int32_t line = 0; + int32_t lino = 0; + + if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) { + blockDataDestroy(pHandle->block); + pHandle->block = NULL; + } + if (pHandle->block == NULL) { + if (pDataBlock == NULL) { + goto END; + } + + STqOffsetVal offset = {0}; + code = qStreamExtractOffset(task, &offset); + TSDB_CHECK_CODE(code, lino, END); + + pHandle->block = NULL; + + code = createOneDataBlock(pDataBlock, true, &pHandle->block); + TSDB_CHECK_CODE(code, lino, END); + + pHandle->blockTime = offset.ts; + tOffsetDestroy(&offset); + int32_t vgId = TD_VID(pTq->pVnode); + code = getDataBlock(task, pHandle, vgId, &pDataBlock); + TSDB_CHECK_CODE(code, lino, END); + } + + const STqExecHandle* pExec = &pHandle->execHandle; + code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + TSDB_CHECK_CODE(code, lino, END); + + pRsp->blockNum++; + if (pDataBlock == NULL) { + blockDataDestroy(pHandle->block); + pHandle->block = NULL; + } else { + code = copyDataBlock(pHandle->block, pDataBlock); + TSDB_CHECK_CODE(code, lino, END); + + STqOffsetVal offset = {0}; + code = qStreamExtractOffset(task, &offset); + TSDB_CHECK_CODE(code, lino, END); + + pRsp->sleepTime = offset.ts - pHandle->blockTime; + pHandle->blockTime = offset.ts; + tOffsetDestroy(&offset); + } + +END: + if (code != TSDB_CODE_SUCCESS) { + tqError("%s failed at %d, failed to process replay response:%s", __FUNCTION__, lino, tstrerror(code)); + } + return code; +} + +int32_t tqScanData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, STqOffsetVal* pOffset, const SMqPollReq* pRequest) { + int32_t code = 0; + int32_t lino = 0; + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pRequest, code, lino, END, TSDB_CODE_INVALID_PARA); + + int32_t vgId = TD_VID(pTq->pVnode); int32_t totalRows = 0; const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - TSDB_CHECK_CODE(code, line, END); + TSDB_CHECK_CODE(code, lino, END); qStreamSetSourceExcluded(task, pRequest->sourceExcluded); uint64_t st = taosGetTimestampMs(); while (1) { SSDataBlock* pDataBlock = NULL; code = getDataBlock(task, pHandle, vgId, &pDataBlock); - TSDB_CHECK_CODE(code, line, END); + TSDB_CHECK_CODE(code, lino, END); if (pRequest->enableReplay) { - if (IS_OFFSET_RESET_TYPE(pRequest->reqOffset.type) && pHandle->block != NULL) { - blockDataDestroy(pHandle->block); - pHandle->block = NULL; - } - if (pHandle->block == NULL) { - if (pDataBlock == NULL) { - break; - } - - STqOffsetVal offset = {0}; - code = qStreamExtractOffset(task, &offset); - TSDB_CHECK_CODE(code, line, END); - - pHandle->block = NULL; - - code = createOneDataBlock(pDataBlock, true, &pHandle->block); - TSDB_CHECK_CODE(code, line, END); - - pHandle->blockTime = offset.ts; - tOffsetDestroy(&offset); - code = getDataBlock(task, pHandle, vgId, &pDataBlock); - TSDB_CHECK_CODE(code, line, END); - } - - code = tqAddBlockDataToRsp(pHandle->block, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - TSDB_CHECK_CODE(code, line, END); - - pRsp->blockNum++; - if (pDataBlock == NULL) { - blockDataDestroy(pHandle->block); - pHandle->block = NULL; - } else { - code = copyDataBlock(pHandle->block, pDataBlock); - TSDB_CHECK_CODE(code, line, END); - - STqOffsetVal offset = {0}; - code = qStreamExtractOffset(task, &offset); - TSDB_CHECK_CODE(code, line, END); - - pRsp->sleepTime = offset.ts - pHandle->blockTime; - pHandle->blockTime = offset.ts; - tOffsetDestroy(&offset); - } + code = tqProcessReplayRsp(pTq, pHandle, pRsp, pRequest, pDataBlock, task); + TSDB_CHECK_CODE(code, lino, END); break; - } else { - if (pDataBlock == NULL) { - break; - } - code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); - TSDB_CHECK_CODE(code, line, END); + } + if (pDataBlock == NULL) { + break; + } + code = tqAddBlockDataToRsp(pDataBlock, pRsp, pExec->numOfCols, pTq->pVnode->config.tsdbCfg.precision); + TSDB_CHECK_CODE(code, lino, END); - pRsp->blockNum++; - totalRows += pDataBlock->info.rows; - if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { - break; - } + pRsp->blockNum++; + totalRows += pDataBlock->info.rows; + if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { + break; } } - tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", - pHandle->consumerId, vgId, pRsp->blockNum, totalRows); + tqDebug("consumer:0x%" PRIx64 " vgId:%d tmq task executed finished, total blocks:%d, totalRows:%d", pHandle->consumerId, vgId, pRsp->blockNum, totalRows); code = qStreamExtractOffset(task, &pRsp->rspOffset); + END: if (code != 0) { - tqError("consumer:0x%" PRIx64 " vgId:%d tmq task executed error, line:%d code:%d", pHandle->consumerId, vgId, line, - code); + tqError("%s failed at %d, tmq task executed error msg:%s", __FUNCTION__, lino, tstrerror(code)); } return code; } int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBatchMetaRsp* pBatchMetaRsp, STqOffsetVal* pOffset) { - if (pTq == NULL || pHandle == NULL || pRsp == NULL || pBatchMetaRsp == NULL || pOffset == NULL) { - return TSDB_CODE_INVALID_PARA; - } + int32_t code = 0; + int32_t lino = 0; + char* tbName = NULL; + SSchemaWrapper* pSW = NULL; + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pOffset, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pBatchMetaRsp, code, lino, END, TSDB_CODE_INVALID_PARA); const STqExecHandle* pExec = &pHandle->execHandle; qTaskInfo_t task = pExec->task; - int code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); - if (code != 0) { - return code; - } + code = qStreamPrepareScan(task, pOffset, pHandle->execHandle.subType); + TSDB_CHECK_CODE(code, lino, END); int32_t rowCnt = 0; while (1) { @@ -206,52 +228,37 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat uint64_t ts = 0; tqDebug("tmqsnap task start to execute"); code = qExecTask(task, &pDataBlock, &ts); - if (code != 0) { - tqError("vgId:%d, task exec error since %s", pTq->pVnode->config.vgId, tstrerror(code)); - return code; - } - + TSDB_CHECK_CODE(code, lino, END); tqDebug("tmqsnap task execute end, get %p", pDataBlock); if (pDataBlock != NULL && pDataBlock->info.rows > 0) { if (pRsp->withTbName) { - char* tbName = taosStrdup(qExtractTbnameFromTask(task)); - if (tbName == NULL) { - tqError("vgId:%d, failed to add tbname to rsp msg, null", pTq->pVnode->config.vgId); - return terrno; - } - if (taosArrayPush(pRsp->blockTbName, &tbName) == NULL){ - tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); - continue; - } + tbName = taosStrdup(qExtractTbnameFromTask(task)); + TSDB_CHECK_NULL(tbName, code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockTbName, &tbName), code, lino, END, terrno); + tbName = NULL; } if (pRsp->withSchema) { - SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); - if(taosArrayPush(pRsp->blockSchema, &pSW) == NULL){ - tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId); - continue; - } + pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task)); + TSDB_CHECK_NULL(pSW, code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno); + pSW = NULL; } - if (tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), - pTq->pVnode->config.tsdbCfg.precision) != 0) { - tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId); - continue; - } + code = tqAddBlockDataToRsp(pDataBlock, pRsp, taosArrayGetSize(pDataBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision); + TSDB_CHECK_CODE(code, lino, END); pRsp->blockNum++; rowCnt += pDataBlock->info.rows; - if (rowCnt <= tmqRowSize) continue; - + if (rowCnt <= tmqRowSize) { + continue; + } } // get meta SMqBatchMetaRsp* tmp = qStreamExtractMetaMsg(task); if (taosArrayGetSize(tmp->batchMetaReq) > 0) { code = qStreamExtractOffset(task, &tmp->rspOffset); - if (code) { - return code; - } - + TSDB_CHECK_CODE(code, lino, END); *pBatchMetaRsp = *tmp; tqDebug("tmqsnap task get meta"); break; @@ -259,16 +266,13 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat if (pDataBlock == NULL) { code = qStreamExtractOffset(task, pOffset); - if (code) { - break; - } + TSDB_CHECK_CODE(code, lino, END); if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) { continue; } - tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), - pHandle->snapshotVer + 1); + tqDebug("tmqsnap vgId: %d, tsdb consume over, switch to wal, ver %" PRId64, TD_VID(pTq->pVnode), pHandle->snapshotVer + 1); code = qStreamExtractOffset(task, &pRsp->rspOffset); break; } @@ -280,119 +284,93 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat } } + tqDebug("%s:%d success", __FUNCTION__, lino); +END: + if (code != 0){ + tqDebug("%s:%d failed, code:%s", __FUNCTION__, lino, tstrerror(code) ); + } + taosMemoryFree(pSW); + taosMemoryFree(tbName); return code; } static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){ - if (pRsp == NULL || pCreateTbReq == NULL) { - return TSDB_CODE_INVALID_PARA; - } int32_t code = 0; + int32_t lino = 0; void* createReq = NULL; + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pCreateTbReq, code, lino, END, TSDB_CODE_INVALID_PARA); + if (pRsp->createTableNum == 0) { pRsp->createTableLen = taosArrayInit(0, sizeof(int32_t)); - if (pRsp->createTableLen == NULL) { - code = terrno; - goto END; - } + TSDB_CHECK_NULL(pRsp->createTableLen, code, lino, END, terrno); pRsp->createTableReq = taosArrayInit(0, sizeof(void*)); - if (pRsp->createTableReq == NULL) { - code = terrno; - goto END; - } + TSDB_CHECK_NULL(pRsp->createTableReq, code, lino, END, terrno); } uint32_t len = 0; tEncodeSize(tEncodeSVCreateTbReq, pCreateTbReq, len, code); - if (TSDB_CODE_SUCCESS != code) { - goto END; - } + TSDB_CHECK_CODE(code, lino, END); createReq = taosMemoryCalloc(1, len); - if (createReq == NULL){ - code = terrno; - goto END; - } + TSDB_CHECK_NULL(createReq, code, lino, END, terrno); + SEncoder encoder = {0}; tEncoderInit(&encoder, createReq, len); code = tEncodeSVCreateTbReq(&encoder, pCreateTbReq); tEncoderClear(&encoder); - if (code < 0) { - goto END; - } - if (taosArrayPush(pRsp->createTableLen, &len) == NULL){ - code = terrno; - goto END; - } - if (taosArrayPush(pRsp->createTableReq, &createReq) == NULL){ - code = terrno; - goto END; - } + TSDB_CHECK_CODE(code, lino, END); + TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno); + TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno); pRsp->createTableNum++; + tqDebug("build create table info msg success"); - return 0; END: - taosMemoryFree(createReq); + if (code != 0){ + tqError("%s failed at %d, failed to build create table info msg:%s", __FUNCTION__, lino, tstrerror(code)); + taosMemoryFree(createReq); + } return code; } -static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){ - if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) { - return; - } +static int32_t tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){ int32_t code = 0; - STqExecHandle* pExec = &pHandle->execHandle; - STqReader* pReader = pExec->pTqReader; + int32_t lino = 0; SArray* pBlocks = NULL; SArray* pSchemas = NULL; + + STqExecHandle* pExec = &pHandle->execHandle; + STqReader* pReader = pExec->pTqReader; + pBlocks = taosArrayInit(0, sizeof(SSDataBlock)); - if (pBlocks == NULL) { - code = terrno; - goto END; - } + TSDB_CHECK_NULL(pBlocks, code, lino, END, terrno); pSchemas = taosArrayInit(0, sizeof(void*)); - if(pSchemas == NULL){ - code = terrno; - goto END; - } + TSDB_CHECK_NULL(pSchemas, code, lino, END, terrno); SSubmitTbData* pSubmitTbDataRet = NULL; int64_t createTime = INT64_MAX; code = tqRetrieveTaosxBlock(pReader, pBlocks, pSchemas, &pSubmitTbDataRet, &createTime); - if (code != 0) { - tqError("vgId:%d, failed to retrieve block", pTq->pVnode->config.vgId); - goto END; - } - - if ((pSubmitTbDataRet->flags & sourceExcluded) != 0) { - goto END; - } + TSDB_CHECK_CODE(code, lino, END); + bool tmp = (pSubmitTbDataRet->flags & sourceExcluded) != 0; + TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); if (pRsp->withTbName) { int64_t uid = pExec->pTqReader->lastBlkUid; code = tqAddTbNameToRsp(pTq, uid, pRsp, taosArrayGetSize(pBlocks)); - if (code != 0) { - tqError("vgId:%d, failed to add tbname to rsp msg", pTq->pVnode->config.vgId); - goto END; - } + TSDB_CHECK_CODE(code, lino, END); } if (pHandle->fetchMeta != WITH_DATA && pSubmitTbDataRet->pCreateTbReq != NULL) { if (pSubmitTbDataRet->ctimeMs - createTime <= 1000) { // judge if table is already created to avoid sending crateTbReq code = buildCreateTbInfo(pRsp, pSubmitTbDataRet->pCreateTbReq); - if (code != 0){ - tqError("vgId:%d, failed to build create table info", pTq->pVnode->config.vgId); - goto END; - } + TSDB_CHECK_CODE(code, lino, END); } } - if (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL) { - goto END; - } + tmp = (pHandle->fetchMeta == ONLY_META && pSubmitTbDataRet->pCreateTbReq == NULL); + TSDB_CHECK_CONDITION(!tmp, code, lino, END, TSDB_CODE_SUCCESS); for (int32_t i = 0; i < taosArrayGetSize(pBlocks); i++) { SSDataBlock* pBlock = taosArrayGet(pBlocks, i); if (pBlock == NULL) { continue; } - if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), - pTq->pVnode->config.tsdbCfg.precision) != 0){ + if (tqAddBlockDataToRsp(pBlock, pRsp, taosArrayGetSize(pBlock->pDataBlock), pTq->pVnode->config.tsdbCfg.precision) != 0){ tqError("vgId:%d, failed to add block to rsp msg", pTq->pVnode->config.vgId); continue; } @@ -405,38 +383,46 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int } pRsp->blockNum++; } - - taosArrayDestroy(pBlocks); - taosArrayDestroy(pSchemas); - return; - + tqDebug("vgId:%d, process sub data success", pTq->pVnode->config.vgId); END: - taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); - taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper); + if (code != 0){ + tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code)); + taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes); + taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper); + } else { + taosArrayDestroy(pBlocks); + taosArrayDestroy(pSchemas); + } + return code; } -int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, - int8_t sourceExcluded) { - if (pTq == NULL || pHandle == NULL || pRsp == NULL || totalRows == NULL) { - return TSDB_CODE_INVALID_PARA; - } +int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) { + int32_t code = 0; + int32_t lino = 0; + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pTq, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(pHandle, code, lino, END, TSDB_CODE_INVALID_PARA); + TSDB_CHECK_NULL(totalRows, code, lino, END, TSDB_CODE_INVALID_PARA); STqExecHandle* pExec = &pHandle->execHandle; - int32_t code = 0; STqReader* pReader = pExec->pTqReader; code = tqReaderSetSubmitMsg(pReader, submit.msgStr, submit.msgLen, submit.ver); - if (code != 0) { - return code; - } + TSDB_CHECK_CODE(code, lino, END); if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { while (tqNextBlockImpl(pReader, NULL)) { - tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); + code = tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); + TSDB_CHECK_CODE(code, lino, END); } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { - tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); + code = tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); + TSDB_CHECK_CODE(code, lino, END); } } +END: + if (code != 0){ + tqError("%s failed at %d, failed to scan log:%s", __FUNCTION__, lino, tstrerror(code)); + } return code; } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index f6a8563c70..2c27f7e515 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -21,21 +21,27 @@ static int32_t tqSendBatchMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, c const SMqBatchMetaRsp* pRsp, int32_t vgId); int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { - if (pRsp == NULL) { - return TSDB_CODE_INVALID_PARA; - } - pRsp->blockData = taosArrayInit(0, sizeof(void*)); - pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); + int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; + tqDebug("%s called", __FUNCTION__ ); + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); - if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL) { - return terrno; - } + pRsp->blockData = taosArrayInit(0, sizeof(void*)); + TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno); + + pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); + TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno); tOffsetCopy(&pRsp->reqOffset, &pOffset); tOffsetCopy(&pRsp->rspOffset, &pOffset); pRsp->withTbName = 0; pRsp->withSchema = false; - return 0; + +END: + if (code != 0){ + tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code)); + } + return code; } void tqUpdateNodeStage(STQ* pTq, bool isLeader) { @@ -44,45 +50,37 @@ void tqUpdateNodeStage(STQ* pTq, bool isLeader) { } static int32_t tqInitTaosxRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset) { - if (pRsp == NULL) { - return TSDB_CODE_INVALID_PARA; - } + int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; + tqDebug("%s called", __FUNCTION__ ); + TSDB_CHECK_NULL(pRsp, code, lino, END, TSDB_CODE_INVALID_PARA); tOffsetCopy(&pRsp->reqOffset, &pOffset); tOffsetCopy(&pRsp->rspOffset, &pOffset); pRsp->withTbName = 1; pRsp->withSchema = 1; pRsp->blockData = taosArrayInit(0, sizeof(void*)); + TSDB_CHECK_NULL(pRsp->blockData, code, lino, END, terrno);\ + pRsp->blockDataLen = taosArrayInit(0, sizeof(int32_t)); + TSDB_CHECK_NULL(pRsp->blockDataLen, code, lino, END, terrno); + pRsp->blockTbName = taosArrayInit(0, sizeof(void*)); + TSDB_CHECK_NULL(pRsp->blockTbName, code, lino, END, terrno); + pRsp->blockSchema = taosArrayInit(0, sizeof(void*)); + TSDB_CHECK_NULL(pRsp->blockSchema, code, lino, END, terrno); - if (pRsp->blockData == NULL || pRsp->blockDataLen == NULL || - pRsp->blockTbName == NULL || pRsp->blockSchema == NULL) { - if (pRsp->blockData != NULL) { - taosArrayDestroy(pRsp->blockData); - pRsp->blockData = NULL; - } - if (pRsp->blockDataLen != NULL) { - taosArrayDestroy(pRsp->blockDataLen); - pRsp->blockDataLen = NULL; - } - - if (pRsp->blockTbName != NULL) { - taosArrayDestroy(pRsp->blockTbName); - pRsp->blockTbName = NULL; - } - - if (pRsp->blockSchema != NULL) { - taosArrayDestroy(pRsp->blockSchema); - pRsp->blockSchema = NULL; - } - - return terrno; +END: + if (code != 0){ + tqError("%s failed at:%d, code:%s", __FUNCTION__ , lino, tstrerror(code)); + taosArrayDestroy(pRsp->blockData); + taosArrayDestroy(pRsp->blockDataLen); + taosArrayDestroy(pRsp->blockTbName); + taosArrayDestroy(pRsp->blockSchema); } - - return 0; + return code; } static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, @@ -151,23 +149,19 @@ static int32_t extractResetOffsetVal(STqOffsetVal* pOffsetVal, STQ* pTq, STqHand static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* pOffset) { - if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || pOffset == NULL) { - return TSDB_CODE_INVALID_PARA; - } + int32_t code = TDB_CODE_SUCCESS; + int32_t lino = 0; + tqDebug("%s called", __FUNCTION__ ); uint64_t consumerId = pRequest->consumerId; int32_t vgId = TD_VID(pTq->pVnode); terrno = 0; SMqDataRsp dataRsp = {0}; - int code = tqInitDataRsp(&dataRsp, *pOffset); - if (code != 0) { - goto end; - } + code = tqInitDataRsp(&dataRsp, *pOffset); + TSDB_CHECK_CODE(code, lino, end); code = qSetTaskId(pHandle->execHandle.task, consumerId, pRequest->reqId); - if (code != 0) { - goto end; - } + TSDB_CHECK_CODE(code, lino, end); code = tqScanData(pTq, pHandle, &dataRsp, pOffset, pRequest); if (code != 0 && terrno != TSDB_CODE_WAL_LOG_NOT_EXIST) { @@ -191,15 +185,21 @@ static int32_t extractDataAndRspForNormalSubscribe(STQ* pTq, STqHandle* pHandle, tOffsetCopy(&dataRsp.reqOffset, pOffset); code = tqSendDataRsp(pHandle, pMsg, pRequest, &dataRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); -end : { - char buf[TSDB_OFFSET_LEN] = {0}; - tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset); - tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 - " code:%d", - consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, code); - tDeleteMqDataRsp(&dataRsp); - return code; -} +end: + { + char buf[TSDB_OFFSET_LEN] = {0}; + tFormatOffset(buf, TSDB_OFFSET_LEN, &dataRsp.rspOffset); + if (code != 0){ + tqError("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " error msg:%s, line:%d", + consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId, tstrerror(code), lino); + } else { + tqDebug("tmq poll: consumer:0x%" PRIx64 ", subkey %s, vgId:%d, rsp block:%d, rsp offset type:%s,QID:0x%" PRIx64 " success", + consumerId, pHandle->subKey, vgId, dataRsp.blockNum, buf, pRequest->reqId); + } + + tDeleteMqDataRsp(&dataRsp); + return code; + } } #define PROCESS_EXCLUDED_MSG(TYPE, DECODE_FUNC, DELETE_FUNC) \ @@ -224,9 +224,6 @@ static void tDeleteCommon(void* parm) {} static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg, STqOffsetVal* offset) { - if (pTq == NULL || pHandle == NULL || pRequest == NULL || pMsg == NULL || offset == NULL) { - return TSDB_CODE_INVALID_PARA; - } int32_t vgId = TD_VID(pTq->pVnode); SMqDataRsp taosxRsp = {0}; SMqBatchMetaRsp btMetaRsp = {0}; @@ -238,17 +235,13 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (taosArrayGetSize(btMetaRsp.batchMetaReq) > 0) { code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); - tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 - ",ts:%" PRId64, - pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid, - btMetaRsp.rspOffset.ts); + tqDebug("tmq poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send meta offset type:%d,uid:%" PRId64 ",ts:%" PRId64, + pRequest->consumerId, pHandle->subKey, vgId, btMetaRsp.rspOffset.type, btMetaRsp.rspOffset.uid,btMetaRsp.rspOffset.ts); goto END; } - tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64 - ",ts:%" PRId64, - pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, - taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts); + tqDebug("taosx poll: consumer:0x%" PRIx64 " subkey:%s vgId:%d, send data blockNum:%d, offset type:%d,uid:%" PRId64",ts:%" PRId64, + pRequest->consumerId, pHandle->subKey, vgId, taosxRsp.blockNum, taosxRsp.rspOffset.type, taosxRsp.rspOffset.uid, taosxRsp.rspOffset.ts); if (taosxRsp.blockNum > 0) { code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto END; @@ -277,16 +270,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (totalMetaRows > 0) { tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); code = tqSendBatchMetaPollRsp(pHandle, pMsg, pRequest, &btMetaRsp, vgId); - if (totalRows != 0) { - tqError("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, totalRows error, vgId:%d offset %" PRId64, - pRequest->consumerId, pRequest->epoch, vgId, fetchVer); - code = code == 0 ? TSDB_CODE_TQ_INTERNAL_ERROR : code; - } goto END; } tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp( - pHandle, pMsg, pRequest, &taosxRsp, + code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto END; } @@ -299,8 +286,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (pHead->msgType != TDMT_VND_SUBMIT) { if (totalRows > 0) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer); - code = tqSendDataRsp( - pHandle, pMsg, pRequest, &taosxRsp, + code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto END; } @@ -331,15 +317,9 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if (!btMetaRsp.batchMetaReq) { btMetaRsp.batchMetaReq = taosArrayInit(4, POINTER_BYTES); - if (btMetaRsp.batchMetaReq == NULL) { - code = TAOS_GET_TERRNO(terrno); - goto END; - } + TQ_NULL_GO_TO_END(btMetaRsp.batchMetaReq); btMetaRsp.batchMetaLen = taosArrayInit(4, sizeof(int32_t)); - if (btMetaRsp.batchMetaLen == NULL) { - code = TAOS_GET_TERRNO(terrno); - goto END; - } + TQ_NULL_GO_TO_END(btMetaRsp.batchMetaLen); } fetchVer++; @@ -355,10 +335,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } int32_t tLen = sizeof(SMqRspHead) + len; void* tBuf = taosMemoryCalloc(1, tLen); - if (tBuf == NULL) { - code = TAOS_GET_TERRNO(terrno); - goto END; - } + TQ_NULL_GO_TO_END(tBuf); void* metaBuff = POINTER_SHIFT(tBuf, sizeof(SMqRspHead)); SEncoder encoder = {0}; tEncoderInit(&encoder, metaBuff, len); @@ -369,14 +346,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, tqError("tmq extract meta from log, tEncodeMqMetaRsp error"); continue; } - if (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf) == NULL) { - code = TAOS_GET_TERRNO(terrno); - goto END; - } - if (taosArrayPush(btMetaRsp.batchMetaLen, &tLen) == NULL) { - code = TAOS_GET_TERRNO(terrno); - goto END; - } + TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaReq, &tBuf)); + TQ_NULL_GO_TO_END (taosArrayPush(btMetaRsp.batchMetaLen, &tLen)); totalMetaRows++; if ((taosArrayGetSize(btMetaRsp.batchMetaReq) >= tmqRowSize) || (taosGetTimestampMs() - st > 1000)) { tqOffsetResetToLog(&btMetaRsp.rspOffset, fetchVer); @@ -399,17 +370,11 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, .ver = pHead->version, }; - code = tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded); - if (code < 0) { - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, - pRequest->subKey); - goto END; - } + TQ_ERR_GO_TO_END(tqTaosxScanLog(pTq, pHandle, submit, &taosxRsp, &totalRows, pRequest->sourceExcluded)); if (totalRows >= tmqRowSize || (taosGetTimestampMs() - st > 1000)) { tqOffsetResetToLog(&taosxRsp.rspOffset, fetchVer + 1); - code = tqSendDataRsp( - pHandle, pMsg, pRequest, &taosxRsp, + code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, taosxRsp.createTableNum > 0 ? TMQ_MSG_TYPE__POLL_DATA_META_RSP : TMQ_MSG_TYPE__POLL_DATA_RSP, vgId); goto END; } else { @@ -419,6 +384,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } END: + if (code != 0){ + tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, + pRequest->subKey); + } tDeleteMqBatchMetaRsp(&btMetaRsp); tDeleteSTaosxRsp(&taosxRsp); return code; @@ -457,6 +426,9 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ } END: + if (code != 0){ + uError("failed to extract data for mq, code:%d", code); + } tOffsetDestroy(&reqOffset); return code; }