fix:[TD-33396]add log for tmq

This commit is contained in:
wangmm0220 2024-12-31 11:22:56 +08:00
parent b63787aa09
commit 85aad90d3a
3 changed files with 24 additions and 29 deletions

View File

@ -500,7 +500,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
pReader->msg.msgLen = msgLen;
pReader->msg.ver = ver;
tqDebug("tq reader set msg %p %d", msgStr, msgLen);
tqDebug("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
SDecoder decoder = {0};
tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
@ -539,31 +539,29 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
int32_t code = false;
int32_t lino = 0;
int64_t uid = 0;
TSDB_CHECK_NULL(pReader, code, lino, END, false);
TSDB_CHECK_NULL(pReader->msg.msgStr, code, lino, END, false);
TSDB_CHECK_NULL(pReader->tbIdHash, code, lino, END, true);
int32_t numOfBlocks = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < numOfBlocks) {
tqDebug("try next data block, len:%d ver:%" PRId64 " index:%d/%d, %s", pReader->msg.msgLen, pReader->msg.ver, (pReader->nextBlk + 1), numOfBlocks, idstr);
int32_t blockSz = taosArrayGetSize(pReader->submit.aSubmitTbData);
while (pReader->nextBlk < blockSz) {
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
TSDB_CHECK_NULL(pSubmitTbData, code, lino, END, false);
uid = pSubmitTbData->uid;
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
tqDebug("discard submit block, uid:%" PRId64 ", total queried tables:%d continue %s", pSubmitTbData->uid, taosHashGetSize(pReader->tbIdHash), idstr);
tqDebug("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
END:
tqDebug("%s:%d, uid:%"PRId64",code:%d", __FUNCTION__, lino, uid, code);
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
@ -583,18 +581,17 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
uid = pSubmitTbData->uid;
void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_NULL(ret, code, lino, END, true);
tqDebug("iterator data block in hash continue, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
tqDebug("discard submit block, uid:%" PRId64 ", continue", pSubmitTbData->uid);
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("all data blocks are filtered out");
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
END:
tqDebug("%s:%d, uid:%"PRId64",code:%d", __FUNCTION__, lino, uid, code);
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
@ -856,7 +853,7 @@ int32_t tqRetrieveDataBlock(STqReader* pReader, SSDataBlock** pRes, const char*
END:
if (code != 0) {
tqError("tqRetrieveDataBlock failed, line:%d, code:%d", line, code);
tqError("tqRetrieveDataBlock failed, line:%d, msg:%s", line, tstrerror(code));
}
taosMemoryFreeClear(pTSchema);
return code;
@ -937,7 +934,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
TQ_NULL_GO_TO_END(pCol);
int32_t numOfRows = pCol->nVal;
int32_t numOfCols = taosArrayGetSize(pCols);
tqDebug("vgId:%d, tqProcessColData, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
tqDebug("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -977,7 +974,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
@ -1000,7 +997,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
int32_t numOfRows = taosArrayGetSize(pRows);
pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
TQ_NULL_GO_TO_END(pTSchema);
tqDebug("vgId:%d, tqProcessRowData, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
tqDebug("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -1039,6 +1036,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
@ -1049,7 +1047,7 @@ END:
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, int64_t *createTime) {
tqDebug("tq reader retrieve data block %p, %d", pReader->msg.msgStr, pReader->nextBlk);
tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) {
return terrno;

View File

@ -39,7 +39,7 @@ int32_t tqAddBlockDataToRsp(const SSDataBlock* pBlock, SMqDataRsp* pRsp, int32_t
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &actualLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
tqDebug("add block data to response success:%p, blockDataLen:%d, blockData:%p", pRsp->blockDataLen, actualLen, pRsp->blockData);
tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", actualLen, buf);
END:
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
@ -332,7 +332,7 @@ END:
return code;
}
static int32_t tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded){
int32_t code = 0;
int32_t lino = 0;
SArray* pBlocks = NULL;
@ -383,7 +383,7 @@ static int32_t tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp,
}
pRsp->blockNum++;
}
tqDebug("vgId:%d, process sub data success", pTq->pVnode->config.vgId);
tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
END:
if (code != 0){
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
@ -393,7 +393,6 @@ END:
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
}
return code;
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, int8_t sourceExcluded) {
@ -410,13 +409,11 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
while (tqNextBlockImpl(pReader, NULL)) {
code = tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
TSDB_CHECK_CODE(code, lino, END);
tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
}
} else if (pExec->subType == TOPIC_SUB_TYPE__DB) {
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
code = tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
TSDB_CHECK_CODE(code, lino, END);
tqProcessSubData(pTq, pHandle, pRsp, totalRows, sourceExcluded);
}
}

View File

@ -279,8 +279,8 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
}
SWalCont* pHead = &pHandle->pWalReader->pHead->head;
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %d",
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, pHead->msgType);
tqDebug("tmq poll: consumer:0x%" PRIx64 " (epoch %d) iter log, vgId:%d offset %" PRId64 " msgType %s",
pRequest->consumerId, pRequest->epoch, vgId, fetchVer, TMSG_INFO(pHead->msgType));
// process meta
if (pHead->msgType != TDMT_VND_SUBMIT) {
@ -385,7 +385,7 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
END:
if (code != 0){
tqError("tmq poll: tqTaosxScanLog error %" PRId64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
tqError("tmq poll: tqTaosxScanLog error. consumerId:0x%" PRIx64 ", in vgId:%d, subkey %s", pRequest->consumerId, vgId,
pRequest->subKey);
}
tDeleteMqBatchMetaRsp(&btMetaRsp);
@ -427,7 +427,7 @@ int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequ
END:
if (code != 0){
uError("failed to extract data for mq, code:%d", code);
uError("failed to extract data for mq, msg:%s", tstrerror(code));
}
tOffsetDestroy(&reqOffset);
return code;