From daea0ec61aa38aa2cb2b9352f375b4ee4d0752f5 Mon Sep 17 00:00:00 2001 From: wangmm0220 Date: Thu, 13 Feb 2025 13:54:54 +0800 Subject: [PATCH] fix:[TS-5776]add raw type from consumer --- source/client/src/clientRawBlockWrite.c | 6 +++--- source/dnode/vnode/src/inc/tq.h | 2 +- source/dnode/vnode/src/tq/tqRead.c | 26 ++++++++++++------------- source/dnode/vnode/src/tq/tqScan.c | 9 +++------ source/dnode/vnode/src/tq/tqUtil.c | 5 ++++- 5 files changed, 24 insertions(+), 24 deletions(-) diff --git a/source/client/src/clientRawBlockWrite.c b/source/client/src/clientRawBlockWrite.c index 7f3a696f26..1ae4bd195c 100644 --- a/source/client/src/clientRawBlockWrite.c +++ b/source/client/src/clientRawBlockWrite.c @@ -2173,7 +2173,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) { void* rawData = getRawDataFromRes(pRetrieve); RAW_NULL_CHECK(rawData); - uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); + uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN); tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); @@ -2256,7 +2256,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen) void* rawData = getRawDataFromRes(pRetrieve); RAW_NULL_CHECK(rawData); - uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); + uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN); tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); @@ -2344,7 +2344,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) void* rawData = getRawDataFromRes(pRetrieve); RAW_NULL_CHECK(rawData); - uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); + uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName); SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}}; tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN); tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN); diff --git a/source/dnode/vnode/src/inc/tq.h b/source/dnode/vnode/src/inc/tq.h index ec82fd8e71..2c06ff1f07 100644 --- a/source/dnode/vnode/src/inc/tq.h +++ b/source/dnode/vnode/src/inc/tq.h @@ -183,7 +183,7 @@ int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode* #define TQ_SUBSCRIBE_NAME "subscribe" #define TQ_OFFSET_NAME "offset-ver0" #define TQ_POLL_MAX_TIME 1000 -#define TQ_POLL_MAX_BYTES 10485760 +#define TQ_POLL_MAX_BYTES 1048576 #ifdef __cplusplus } diff --git a/source/dnode/vnode/src/tq/tqRead.c b/source/dnode/vnode/src/tq/tqRead.c index 796a4a8742..792763862b 100644 --- a/source/dnode/vnode/src/tq/tqRead.c +++ b/source/dnode/vnode/src/tq/tqRead.c @@ -500,7 +500,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i pReader->msg.msgLen = msgLen; pReader->msg.ver = ver; - tqDebug("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen); + tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen); SDecoder decoder = {0}; tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen); @@ -558,15 +558,15 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) { void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t)); TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true); - tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); + tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); pReader->nextBlk++; } tqReaderClearSubmitMsg(pReader); - tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid); + tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid); END: - tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); + tqTrace("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); return code; } @@ -586,14 +586,14 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) { uid = pSubmitTbData->uid; void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t)); TSDB_CHECK_NULL(ret, code, lino, END, true); - tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); + tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid); pReader->nextBlk++; } tqReaderClearSubmitMsg(pReader); - tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid); + tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid); END: - tqDebug("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); + tqTrace("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid); return code; } @@ -936,7 +936,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData TQ_NULL_GO_TO_END(pCol); int32_t numOfRows = pCol->nVal; int32_t numOfCols = taosArrayGetSize(pCols); - tqDebug("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows); + tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows); for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; @@ -976,7 +976,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData } SSDataBlock* pLastBlock = taosArrayGetLast(blocks); pLastBlock->info.rows = curRow - lastRow; - tqDebug("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks)); + tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks)); END: if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code); @@ -999,7 +999,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra int32_t numOfRows = taosArrayGetSize(pRows); pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version); TQ_NULL_GO_TO_END(pTSchema); - tqDebug("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows); + tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows); for (int32_t i = 0; i < numOfRows; i++) { bool buildNew = false; @@ -1038,7 +1038,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra SSDataBlock* pLastBlock = taosArrayGetLast(blocks); pLastBlock->info.rows = curRow - lastRow; - tqDebug("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks)); + tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks)); END: if (code != TSDB_CODE_SUCCESS) { tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code); @@ -1076,7 +1076,7 @@ static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){ TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno); pRsp->createTableNum++; - tqDebug("build create table info msg success"); + tqTrace("build create table info msg success"); END: if (code != 0){ @@ -1087,7 +1087,7 @@ static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){ } int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) { - tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk); + tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk); SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk); if (pSubmitTbData == NULL) { return terrno; diff --git a/source/dnode/vnode/src/tq/tqScan.c b/source/dnode/vnode/src/tq/tqScan.c index 941e74fd9b..265e165038 100644 --- a/source/dnode/vnode/src/tq/tqScan.c +++ b/source/dnode/vnode/src/tq/tqScan.c @@ -31,11 +31,8 @@ static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t p memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno); TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno); -// for (int m= 0; m < 56; m++){ -// printf("add data[%d] = %d\n", m, *((int8_t *)rawData+m)); -// } - tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf); + tqTrace("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf); END: if (code != TSDB_CODE_SUCCESS) { taosMemoryFree(buf); @@ -95,7 +92,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid); continue; } - tqDebug("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid); + tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid); } END: @@ -399,7 +396,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int *pSW = NULL; pRsp->blockNum++; } - tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows); + tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows); END: if (code != 0) { tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code)); diff --git a/source/dnode/vnode/src/tq/tqUtil.c b/source/dnode/vnode/src/tq/tqUtil.c index bb6751c883..5dd8de9429 100644 --- a/source/dnode/vnode/src/tq/tqUtil.c +++ b/source/dnode/vnode/src/tq/tqUtil.c @@ -378,7 +378,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle, if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) || (taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) || - (pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES || taosxRsp.createTableNum > 0 || terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) { + (pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES || + taosxRsp.createTableNum > 0 || + taosArrayGetSize(taosxRsp.blockData) > tmqRowSize || + terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) { tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_DUPLICATE_UID ? fetchVer : fetchVer + 1); code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp, POLL_RSP_TYPE(pRequest, taosxRsp), vgId);