fix:[TS-5776]add raw type from consumer
This commit is contained in:
parent
f16f21897e
commit
39f77e45fc
|
@ -462,6 +462,13 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
|
||||||
goto END;
|
goto END;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this submit data is rawdata and previous data is metadata
|
||||||
|
if (pRequest->rawData != 0 && pRsp->createTableNum > 0 && rawList != NULL){
|
||||||
|
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
|
||||||
|
terrno = TSDB_CODE_TMQ_RAW_DATA_SPLIT;
|
||||||
|
goto END;
|
||||||
|
}
|
||||||
|
|
||||||
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
if (pExec->subType == TOPIC_SUB_TYPE__TABLE) {
|
||||||
while (tqNextBlockImpl(pReader, NULL)) {
|
while (tqNextBlockImpl(pReader, NULL)) {
|
||||||
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
|
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
|
||||||
|
|
|
@ -379,11 +379,10 @@ static int32_t extractDataAndRspForDbStbSubscribe(STQ* pTq, STqHandle* pHandle,
|
||||||
if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) ||
|
if ((pRequest->rawData == 0 && totalRows >= tmqRowSize) ||
|
||||||
(taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) ||
|
(taosGetTimestampMs() - st > TMIN(TQ_POLL_MAX_TIME, pRequest->timeout)) ||
|
||||||
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES ||
|
(pRequest->rawData != 0 && (totalRows >= TQ_POLL_MAX_BYTES ||
|
||||||
taosxRsp.createTableNum > 0 ||
|
|
||||||
taosArrayGetSize(taosxRsp.blockData) > tmqRowSize ||
|
taosArrayGetSize(taosxRsp.blockData) > tmqRowSize ||
|
||||||
terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
|
terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT))) {
|
||||||
tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
|
// tqDebug("start to send rsp, block num:%d, totalRows:%d, createTableNum:%d, terrno:%d",
|
||||||
(int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);
|
// (int)taosArrayGetSize(taosxRsp.blockData), totalRows, taosxRsp.createTableNum, terrno);
|
||||||
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1);
|
tqOffsetResetToLog(&taosxRsp.rspOffset, terrno == TSDB_CODE_TMQ_RAW_DATA_SPLIT ? fetchVer : fetchVer + 1);
|
||||||
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
|
code = tqSendDataRsp(pHandle, pMsg, pRequest, &taosxRsp,
|
||||||
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
|
POLL_RSP_TYPE(pRequest, taosxRsp), vgId);
|
||||||
|
|
Loading…
Reference in New Issue