diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 61b59e4cd0..937ca80bcc 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -500,7 +500,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i pReader->msg.msgLen = msgLen; pReader->msg.ver = ver; - tqDebug("tq reader set msg %p %d", msgStr, msgLen); + tqDebug("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen); SDecoder decoder = {0}; tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen); @@ -539,31 +539,29 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { int32_t code = false; int32_t lino = 0; int64_t uid = 0; - TSDB_CHECK_NULL(pReader, code, lino, END, false); TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false); TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true); - int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData); - while (pReader->nextBlk < numOfBlocks) { - tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, (pReader->nextBlk + 1), numOfBlocks, idstr); - + int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData); + while (pReader->nextBlk < blockSz) { SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false); uid = pSubmitTbData->uid; void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true); - tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, taosHashGetSize(pReader->tbIdHash), idstr); + tqDebug("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid); pReader->nextBlk++; } tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->nextBlk = 0; pReader->msg.msgStr = NULL; + tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid); END: - tqDebug("%s:%d, uid:%"PRId64",code:%d", __FUNCTION__, lino, uid, code); + tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); return code; } @@ -583,18 +581,17 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) { uid = pSubmitTbData->uid; void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t)); TSDB_CHECK_NULL(ret, code, lino, END, true); - + tqDebug("iterator data block in hash continue, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); pReader->nextBlk++; - tqDebug("discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid); } tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE); pReader->nextBlk = 0; pReader->msg.msgStr = NULL; - tqDebug("all data blocks are filtered out"); + tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid); END: - tqDebug("%s:%d, uid:%"PRId64",code:%d", __FUNCTION__, lino, uid, code); + tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); return code; } @@ -856,7 +853,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char* END: if (code != 0) { - tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code); + tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code)); } taosMemoryFreeClear(pTSchema); return code; @@ -937,7 +934,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData TQ_NULL_GO_TO_END(pCol); int32_t numOfRows = pCol->nVal; int32_t numOfCols = taosArrayGetSize(pCols); - tqDebug("vgId:%d, tqProcessColData, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows); + tqDebug("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows); for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; @@ -977,7 +974,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData } SSDataBlock* pLastBlock = taosArrayGetLast(blocks); pLastBlock->info.rows = curRow - lastRow; - + tqDebug("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks)); END: if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code); @@ -1000,7 +997,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra int32_t numOfRows = taosArrayGetSize(pRows); pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version); TQ_NULL_GO_TO_END(pTSchema); - tqDebug("vgId:%d, tqProcessRowData, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows); + tqDebug("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows); for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; @@ -1039,6 +1036,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra SSDataBlock* pLastBlock = taosArrayGetLast(blocks); pLastBlock->info.rows = curRow - lastRow; + tqDebug("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks)); END: if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code); @@ -1049,7 +1047,7 @@ END: } int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) { - tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk); + tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pSubmitTbData == NULL) { return terrno; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index ec200ba144..b4dc610a6a 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -39,7 +39,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno); - tqDebug("add block data to response success:%p, blockDataLen:%d, blockData:%p", pRsp->blockDataLen, actualLen, pRsp->blockData); + tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", actualLen, buf); END: if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(buf); @@ -332,7 +332,7 @@ END: return code; } -static int32_t tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){ +static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){ int32_t code = 0; int32_t lino = 0; SArray* pBlocks = NULL; @@ -383,7 +383,7 @@ static int32_t tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, } pRsp->blockNum++; } - tqDebug("vgId:%d, process sub data success", pTq->pVnode->config.vgId); + tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows); END: if (code != 0){ tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code)); @@ -393,7 +393,6 @@ END: taosArrayDestroy(pBlocks); taosArrayDestroy(pSchemas); } - return code; } int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) { @@ -410,13 +409,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData if (pExec->subType == TOPIC_SUB_TYPE__TABLE) { while (tqNextBlockImpl(pReader, NULL)) { - code = tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); - TSDB_CHECK_CODE(code, lino, END); + tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); } } else if (pExec->subType == TOPIC_SUB_TYPE__DB) { while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) { - code = tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); - TSDB_CHECK_CODE(code, lino, END); + tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded); } } diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index 2c27f7e515..6ecb1b1b4d 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -279,8 +279,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, } SWalCont* pHead = &pHandle->pWalReader->pHead->head; - tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d", - pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType); + tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s", + pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType)); // process meta if (pHead->msgType != TDMT_VND_SUBMIT) { @@ -385,7 +385,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, END: if (code != 0){ - tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, + tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId, pRequest->subKey); } tDeleteMqBatchMetaRsp(&btMetaRsp); @@ -427,7 +427,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ END: if (code != 0){ - uError("failed to extract data for mq, code:%d", code); + uError("failed to extract data for mq, msg:%s", tstrerror(code)); } tOffsetDestroy(&reqOffset); return code;