fix:[TS-5776]add raw type from consumer

This commit is contained in:
wangmm0220 2025-01-24 14:56:00 +08:00
parent 7fad4bceb0
commit 902d067776
6 changed files with 22 additions and 29 deletions

View File

@ -2385,7 +2385,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen)
end:
uDebug(LOG_ID_TAG " write raw rawdata return, msg:%s", LOG_ID_VALUE, tstrerror(code));
tDeleteSTaosxRsp(&rspObj.dataRsp);
tDeleteMqDataRsp(&rspObj.dataRsp);
tDecoderClear(&decoder);
qDestroyQuery(pQuery);
taosHashCleanup(pVgroupHash);

View File

@ -11734,7 +11734,7 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
}
TAOS_CHECK_EXIT(tDecodeSVCreateTbReq(pCoder, pSubmitTbData->pCreateTbReq));
dataAfterCreate = pCoder->data;
dataAfterCreate = pCoder->data + pCoder->pos;
posAfterCreate = pCoder->pos;
}

View File

@ -242,6 +242,7 @@ int64_t tqGetResultBlockTime(STqReader *pReader);
int32_t extractMsgFromWal(SWalReader *pReader, void **pItem, int64_t maxVer, const char *id);
int32_t tqReaderSetSubmitMsg(STqReader *pReader, void *msgStr, int32_t msgLen, int64_t ver, SArray* rawList);
void tqReaderClearSubmitMsg(STqReader *pReader);
bool tqNextDataBlockFilterOut(STqReader *pReader, SHashObj *filterOutUids);
int32_t tqRetrieveDataBlock(STqReader *pReader, SSDataBlock **pRes, const char *idstr);
int32_t tqRetrieveTaosxBlock(STqReader *pReader, SMqDataRsp* pRsp, SArray *blocks, SArray *schemas, SSubmitTbData **pSubmitTbDataRet, SArray* rawList, int8_t fetchMeta);

View File

@ -514,6 +514,13 @@ int32_t tqReaderSetSubmitMsg(STqReader* pReader, void* msgStr, int32_t msgLen, i
return code;
}
void tqReaderClearSubmitMsg(STqReader *pReader) {
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
}
SWalReader* tqGetWalReader(STqReader* pReader) {
if (pReader == NULL) {
return NULL;
@ -551,14 +558,12 @@ bool tqNextBlockImpl(STqReader* pReader, const char* idstr) {
void* ret = taosHashGet(pReader->tbIdHash, &pSubmitTbData->uid, sizeof(int64_t));
TSDB_CHECK_CONDITION(ret == NULL, code, lino, END, true);
tqDebug("iterator data block in hash continue, progress:%d/%d, total queried tables:%d, uid:%"PRId64, pReader->nextBlk, blockSz, taosHashGetSize(pReader->tbIdHash), uid);
tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqDebug("iterator data block end, block progress:%d/%d, uid:%"PRId64, pReader->nextBlk, blockSz, uid);
tqReaderClearSubmitMsg(pReader);
tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
END:
tqDebug("%s:%d return:%s, uid:%"PRId64, __FUNCTION__, lino, code?"true":"false", uid);
@ -584,10 +589,7 @@ bool tqNextDataBlockFilterOut(STqReader* pReader, SHashObj* filterOutUids) {
tqDebug("iterator data block in hash jump block, progress:%d/%d, uid:%" PRId64 "", pReader->nextBlk, blockSz, uid);
pReader->nextBlk++;
}
tDestroySubmitReq(&pReader->submit, TSDB_MSG_FLG_DECODE);
pReader->nextBlk = 0;
pReader->msg.msgStr = NULL;
tqReaderClearSubmitMsg(pReader);
tqDebug("iterator data block end, total block num:%d, uid:%"PRId64, blockSz, uid);
END:

View File

@ -344,7 +344,6 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
if (taosHashGet(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES) != NULL) {
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 " is already exists", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
terrno = TSDB_CODE_TMQ_DUPLICATE_UID;
pReader->nextBlk = 0;
goto END;
} else {
code = taosHashPut(pRequest->uidHash, &pExec->pTqReader->lastBlkUid, LONG_BYTES, &pExec->pTqReader->lastBlkUid, LONG_BYTES);
@ -357,7 +356,6 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
tqDebug("poll rawdata split,vgId:%d, uid:%" PRId64 ", this submit data is metadata and previous data is data", pTq->pVnode->config.vgId, pExec->pTqReader->lastBlkUid);
terrno = TSDB_CODE_TMQ_DUPLICATE_UID;
pRsp->createTableNum = 0;
pReader->nextBlk = 0;
goto END;
}
@ -391,26 +389,23 @@ static void tqProcessSubData(STQ* pTq, STqHandle* pHandle, SMqDataRsp* pRsp, int
continue;
}
*totalRows += pBlock->info.rows;
blockDataFreeRes(pBlock);
}
SSchemaWrapper* pSW = taosArrayGetP(pSchemas, i);
if (taosArrayPush(pRsp->blockSchema, &pSW) == NULL){
void** pSW = taosArrayGet(pSchemas, i);
if (taosArrayPush(pRsp->blockSchema, pSW) == NULL){
tqError("vgId:%d, failed to add schema to rsp msg", pTq->pVnode->config.vgId);
continue;
}
*pSW = NULL;
pRsp->blockNum++;
}
tqDebug("vgId:%d, process sub data success, response blocknum:%d, rows:%d", pTq->pVnode->config.vgId, pRsp->blockNum, *totalRows);
END:
if (code != 0){
if (code != 0) {
tqError("%s failed at %d, failed to process sub data:%s", __FUNCTION__, lino, tstrerror(code));
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
} else {
taosArrayDestroy(pBlocks);
taosArrayDestroy(pSchemas);
}
taosArrayDestroyEx(pBlocks, (FDelete)blockDataFreeRes);
taosArrayDestroyP(pSchemas, (FDelete)tDeleteSchemaWrapper);
}
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest) {
@ -430,6 +425,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
while (tqNextBlockImpl(pReader, NULL)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){
tqReaderClearSubmitMsg(pReader);
goto END;
}
}
@ -437,6 +433,7 @@ int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqData
while (tqNextDataBlockFilterOut(pReader, pExec->execDb.pFilterOutTbUid)) {
tqProcessSubData(pTq, pHandle, pRsp, totalRows, pRequest, rawList);
if (terrno == TSDB_CODE_TMQ_DUPLICATE_UID){
tqReaderClearSubmitMsg(pReader);
goto END;
}
}

View File

@ -859,13 +859,6 @@ int32_t insBuildVgDataBlocks(SHashObj* pVgroupsHashObj, SArray* pVgDataCxtList,
}
if (TSDB_CODE_SUCCESS == code) {
code = buildSubmitReq(src->vgId, src->pData, &dst->pData, &dst->size);
SSubmitReq2 *pSubmitReq = &(SSubmitReq2){0};
SDecoder dc = {0};
tDecoderInit(&dc, POINTER_SHIFT(dst->pData, sizeof(SSubmitReq2Msg)), dst->size - sizeof(SSubmitReq2Msg));
if (tDecodeSubmitReq(&dc, pSubmitReq, NULL) < 0) {
code = TSDB_CODE_INVALID_MSG;
}
tDecoderClear(&dc);
}
if (TSDB_CODE_SUCCESS == code) {
code = (NULL == taosArrayPush(pDataBlocks, &dst) ? terrno : TSDB_CODE_SUCCESS);