fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-02-13 13:54:54 +08:00
parent c3f4c4cf27
commit daea0ec61a
5 changed files with 24 additions and 24 deletions

View File

@ -2173,7 +2173,7 @@ static int32_t tmqWriteRawDataImpl(TAOS* taos, void* data, uint32_t dataLen) {
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
@ -2256,7 +2256,7 @@ static int32_t tmqWriteRawMetaDataImpl(TAOS* taos, void* data, uint32_t dataLen)
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);
@ -2344,7 +2344,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen)
void* rawData = getRawDataFromRes(pRetrieve);
RAW_NULL_CHECK(rawData);
uDebug(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
uTrace(LOG_ID_TAG " write raw data block tbname:%s", LOG_ID_VALUE, tbName);
SName pName = {TSDB_TABLE_NAME_T, pRequest->pTscObj->acctId, {0}, {0}};
tstrncpy(pName.dbname, pRequest->pDb, TSDB_DB_NAME_LEN);
tstrncpy(pName.tname, tbName, TSDB_TABLE_NAME_LEN);

View File

@ -183,7 +183,7 @@ int32_t tqSendAllNotifyEvents(const SArray* pBlocks, SStreamTask* pTask, SVnode*
#define TQ_SUBSCRIBE_NAME "subscribe"
#define TQ_OFFSET_NAME "offset-ver0"
#define TQ_POLL_MAX_TIME 1000
#define TQ_POLL_MAX_BYTES 10485760
#define TQ_POLL_MAX_BYTES 1048576
#ifdef __cplusplus
}

View File

@ -500,7 +500,7 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
pReader->msg.msgLen = msgLen;
pReader->msg.ver = ver;
tqDebug("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
tqTrace("tq reader set msg pointer:%p, msg len:%d", msgStr, msgLen);
SDecoder decoder = {0};
tDecoderInit(&decoder, pReader->msg.msgStr, pReader->msg.msgLen);
@ -558,15 +558,15 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tqReaderClearSubmitMsg(pReader);
tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
END:
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
tqTrace("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
@ -586,14 +586,14 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
uid = pSubmitTbData->uid;
void* ret = taosHashGet(filterOutUids, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_NULL(ret, code, lino, END, true);
tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
tqTrace("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tqReaderClearSubmitMsg(pReader);
tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
tqTrace("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
END:
tqDebug("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
tqTrace("%s:%d get data:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
return code;
}
@ -936,7 +936,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
TQ_NULL_GO_TO_END(pCol);
int32_t numOfRows = pCol->nVal;
int32_t numOfCols = taosArrayGetSize(pCols);
tqDebug("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
tqTrace("vgId:%d, tqProcessColData start, col num: %d, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -976,7 +976,7 @@ static int32_t tqProcessColData(STqReader* pReader, SSubmitTbData* pSubmitTbData
}
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
tqTrace("vgId:%d, tqProcessColData end, col num: %d, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfCols, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process col data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
@ -999,7 +999,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
int32_t numOfRows = taosArrayGetSize(pRows);
pTSchema = tBuildTSchema(pSchemaWrapper->pSchema, pSchemaWrapper->nCols, pSchemaWrapper->version);
TQ_NULL_GO_TO_END(pTSchema);
tqDebug("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
tqTrace("vgId:%d, tqProcessRowData start, rows:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows);
for (int32_t i = 0; i < numOfRows; i++) {
bool buildNew = false;
@ -1038,7 +1038,7 @@ int32_t tqProcessRowData(STqReader* pReader, SSubmitTbData* pSubmitTbData, SArra
SSDataBlock* pLastBlock = taosArrayGetLast(blocks);
pLastBlock->info.rows = curRow - lastRow;
tqDebug("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
tqTrace("vgId:%d, tqProcessRowData end, rows:%d, block num:%d", pReader->pWalReader->pWal->cfg.vgId, numOfRows, (int)taosArrayGetSize(blocks));
END:
if (code != TSDB_CODE_SUCCESS) {
tqError("vgId:%d, process row data failed, code:%d", pReader->pWalReader->pWal->cfg.vgId, code);
@ -1076,7 +1076,7 @@ static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableLen, &len), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->createTableReq, &createReq), code, lino, END, terrno);
pRsp->createTableNum++;
tqDebug("build create table info msg success");
tqTrace("build create table info msg success");
END:
if (code != 0){
@ -1087,7 +1087,7 @@ static int32_t buildCreateTbInfo(SMqDataRsp* pRsp, SVCreateTbReq* pCreateTbReq){
}
int32_t tqRetrieveTaosxBlock(STqReader* pReader, SMqDataRsp* pRsp, SArray* blocks, SArray* schemas, SSubmitTbData** pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta) {
tqDebug("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
tqTrace("tq reader retrieve data block msg pointer:%p, index:%d", pReader->msg.msgStr, pReader->nextBlk);
SSubmitTbData* pSubmitTbData = taosArrayGet(pReader->submit.aSubmitTbData, pReader->nextBlk);
if (pSubmitTbData == NULL) {
return terrno;

View File

@ -31,11 +31,8 @@ static int32_t tqAddRawDataToRsp(const void* rawData, SMqDataRsp* pRsp, int8_t p
memcpy(pRetrieve->data, rawData, *(uint32_t *)rawData + INT_BYTES);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockDataLen, &dataStrLen), code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockData, &buf), code, lino, END, terrno);
// for (int m= 0; m < 56; m++){
// printf("add data[%d] = %d\n", m, *((int8_t *)rawData+m));
// }
tqDebug("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
tqTrace("add block data to block array, blockDataLen:%d, blockData:%p", dataStrLen, buf);
END:
if (code != TSDB_CODE_SUCCESS) {
taosMemoryFree(buf);
@ -95,7 +92,7 @@ static int32_t tqAddTbNameToRsp(const STQ* pTq, int64_t uid, SMqDataRsp* pRsp, i
tqError("failed to push tbName to blockTbName:%s, uid:%"PRId64, tbName, uid);
continue;
}
tqDebug("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
tqTrace("add tbName to response success tbname:%s, uid:%"PRId64, tbName, uid);
}
END:
@ -399,7 +396,7 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
*pSW = NULL;
pRsp->blockNum++;
}
tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
tqTrace("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
END:
if (code != 0) {
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));

View File

@ -378,7 +378,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) ||
(taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) ||
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES || taosxRsp.createTableNum > 0 || terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) {
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES ||
taosxRsp.createTableNum > 0 ||
taosArrayGetSize(taosxRsp.blockData) > tmqRowSize ||
terrno == TSDB_CODE_TMQ_DUPLICATE_UID))) {
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_DUPLICATE_UID ? fetchVer : fetchVer + 1);
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);