fix:[TS-5776]avoid memcpy fo DataRspObj

This commit is contained in:
wangmm0220 2025-02-11 18:04:53 +08:00
parent 170d78b69f
commit 878bac1556
7 changed files with 59 additions and 46 deletions

View File

@ -1948,10 +1948,10 @@ static int32_t initRawCacheHash() {
}
static bool needRefreshMeta(void* rawData, STableMeta* pTableMeta, SSchemaWrapper* pSW) {
if (rawData == NULL){
if (rawData == NULL || pSW == NULL){
return false;
}
if (pTableMeta == NULL || pSW == NULL) {
if (pTableMeta == NULL) {
uError("invalid parameter in %s", __func__);
return false;
}
@ -2074,7 +2074,7 @@ static int32_t processCacheMeta(SHashObj* pVgHash, SHashObj* pNameHash, SHashObj
SVCreateTbReq* pCreateReqDst, SCatalog* pCatalog, SRequestConnInfo* conn, SName* pName,
STableMeta** pMeta, SSchemaWrapper* pSW, void* rawData, int32_t retry) {
if (pVgHash == NULL || pNameHash == NULL || pMetaHash == NULL || pCatalog == NULL || conn == NULL || pName == NULL ||
pMeta == NULL || pSW == NULL) {
pMeta == NULL) {
uError("invalid parameter in %s", __func__);
return TSDB_CODE_INVALID_PARA;
}
@ -2337,14 +2337,8 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen)
RAW_NULL_CHECK(pStmt->pVgDataBlocks);
while (++rspObj.resIter < rspObj.dataRsp.blockNum) {
if (!rspObj.dataRsp.withSchema) {
goto end;
}
const char* tbName = (const char*)taosArrayGetP(rspObj.dataRsp.blockTbName, rspObj.resIter);
RAW_NULL_CHECK(tbName);
SSchemaWrapper* pSW = (SSchemaWrapper*)taosArrayGetP(rspObj.dataRsp.blockSchema, rspObj.resIter);
RAW_NULL_CHECK(pSW);
void* pRetrieve = taosArrayGetP(rspObj.dataRsp.blockData, rspObj.resIter);
RAW_NULL_CHECK(pRetrieve);
void* rawData = getRawDataFromRes(pRetrieve);
@ -2358,7 +2352,7 @@ static int32_t tmqWriteRawRawDataImpl(TAOS* taos, void* data, uint32_t dataLen)
// find schema data info
STableMeta* pTableMeta = NULL;
RAW_RETURN_CHECK(processCacheMeta(pVgHash, pNameHash, pMetaHash, NULL, pCatalog, &conn, &pName,
&pTableMeta, pSW, NULL, retry));
&pTableMeta, NULL, NULL, retry));
char err[ERR_MSG_LEN] = {0};
code = rawBlockBindRawData(pVgroupHash, pStmt->pVgDataBlocks, pTableMeta, rawData);
if (code != TSDB_CODE_SUCCESS) {

View File

@ -42,18 +42,19 @@
#define PROCESS_POLL_RSP(FUNC,DATA) \
SDecoder decoder = {0}; \
tDecoderInit(&decoder, POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead)), pMsg->len - sizeof(SMqRspHead)); \
tDecoderInit(&decoder, POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead)), pRspWrapper->pollRsp.len - sizeof(SMqRspHead)); \
if (FUNC(&decoder, DATA) < 0) { \
tDecoderClear(&decoder); \
code = terrno; \
goto END;\
}\
tDecoderClear(&decoder);\
(void)memcpy(DATA, pMsg->pData, sizeof(SMqRspHead));
(void)memcpy(DATA, pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
#define DELETE_POLL_RSP(FUNC,DATA) \
SMqPollRspWrapper* pRsp = &rspWrapper->pollRsp;\
taosMemoryFreeClear(pRsp->pEpset);\
taosMemoryFreeClear(pRsp->pEpset); \
taosMemoryFreeClear(pRsp->data); \
FUNC(DATA);
enum {
@ -190,6 +191,8 @@ typedef struct {
SMqClientTopic* topicHandle;
uint64_t reqId;
SEpSet* pEpset;
void* data;
uint32_t len;
union {
struct{
SMqRspHead head;
@ -1742,7 +1745,7 @@ tmq_t* tmq_consumer_new(tmq_conf_t* conf, char* errstr, int32_t errstrLen) {
pTmq->resetOffsetCfg = conf->resetOffset;
pTmq->replayEnable = conf->replayEnable;
pTmq->sourceExcluded = conf->sourceExcluded;
pTmq->rawData = 1;
pTmq->rawData = conf->rawData;;
pTmq->enableBatchMeta = conf->enableBatchMeta;
tstrncpy(pTmq->user, user, TSDB_USER_LEN);
if (taosGetFqdn(pTmq->fqdn) != 0) {
@ -2068,27 +2071,13 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
}
rspType = ((SMqRspHead*)pMsg->pData)->mqMsgType;
tqDebugC("consumer:0x%" PRIx64 " recv poll rsp, vgId:%d, type %d(%s),QID:0x%" PRIx64, tmq->consumerId, vgId, rspType, tmqMsgTypeStr[rspType], requestId);
if (rspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_META_RSP) {
PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else if (rspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.len = pMsg->len - sizeof(SMqRspHead);
pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pMsg->pData, sizeof(SMqRspHead));
pMsg->pData = NULL;
} else { // invalid rspType
tqErrorC("consumer:0x%" PRIx64 " invalid rsp msg received, type:%d ignored", tmq->consumerId, rspType);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
pRspWrapper->tmqRspType = rspType;
pRspWrapper->pollRsp.reqId = requestId;
pRspWrapper->pollRsp.pEpset = pMsg->pEpSet;
pRspWrapper->pollRsp.data = pMsg->pData;
pRspWrapper->pollRsp.len = pMsg->len;
pMsg->pData = NULL;
pMsg->pEpSet = NULL;
END:
@ -2519,6 +2508,29 @@ END:
return pRspObj;
}
static int32_t processWrapperData(SMqRspWrapper* pRspWrapper){
int32_t code = 0;
if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqDataRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_META_RSP) {
PROCESS_POLL_RSP(tDecodeMqMetaRsp, &pRspWrapper->pollRsp.metaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_DATA_META_RSP) {
PROCESS_POLL_RSP(tDecodeSTaosxRsp, &pRspWrapper->pollRsp.dataRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_BATCH_META_RSP) {
PROCESS_POLL_RSP(tSemiDecodeMqBatchMetaRsp, &pRspWrapper->pollRsp.batchMetaRsp)
} else if (pRspWrapper->tmqRspType == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {
PROCESS_POLL_RSP(tDecodeMqRawDataRsp, &pRspWrapper->pollRsp.dataRsp)
pRspWrapper->pollRsp.dataRsp.len = pRspWrapper->pollRsp.len - sizeof(SMqRspHead);
pRspWrapper->pollRsp.dataRsp.rawData = POINTER_SHIFT(pRspWrapper->pollRsp.data, sizeof(SMqRspHead));
pRspWrapper->pollRsp.data = NULL;
} else {
tqErrorC("invalid rsp msg received, type:%d ignored", pRspWrapper->tmqRspType);
code = TSDB_CODE_TSC_INTERNAL_ERROR;
goto END;
}
END:
return code;
}
static void* tmqHandleAllRsp(tmq_t* tmq) {
tqDebugC("consumer:0x%" PRIx64 " start to handle the rsp, total:%d", tmq->consumerId, taosQallItemSize(tmq->qall));
@ -2538,12 +2550,16 @@ static void* tmqHandleAllRsp(tmq_t* tmq) {
}
tqDebugC("consumer:0x%" PRIx64 " handle rsp, type:%s", tmq->consumerId, tmqMsgTypeStr[pRspWrapper->tmqRspType]);
if (pRspWrapper->code != 0) {
code = processMqRspError(tmq, pRspWrapper);
}else{
returnVal = processMqRsp(tmq, pRspWrapper);
code = terrno;
code = processWrapperData(pRspWrapper);
if (code == 0){
if (pRspWrapper->code != 0) {
code = processMqRspError(tmq, pRspWrapper);
}else{
returnVal = processMqRsp(tmq, pRspWrapper);
code = terrno;
}
}
tmqFreeRspWrapper(pRspWrapper);
taosFreeQitem(pRspWrapper);
if(returnVal != NULL || code != 0){

View File

@ -11445,9 +11445,9 @@ int32_t tDecodeMqDataRspCommon(SDecoder *pDecoder, SMqDataRsp *pRsp) {
}
for (int32_t i = 0; i < pRsp->blockNum; i++) {
void *data;
uint64_t bLen;
TAOS_CHECK_EXIT(tDecodeBinaryAlloc(pDecoder, &data, &bLen));
void *data = NULL;
uint32_t bLen = 0;
TAOS_CHECK_EXIT(tDecodeBinary(pDecoder, (uint8_t**)&data, &bLen));
if (taosArrayPush(pRsp->blockData, &data) == NULL) {
TAOS_CHECK_EXIT(terrno);
}
@ -11510,7 +11510,7 @@ _exit:
static void tDeleteMqDataRspCommon(SMqDataRsp *pRsp) {
taosArrayDestroy(pRsp->blockDataLen);
pRsp->blockDataLen = NULL;
taosArrayDestroyP(pRsp->blockData, NULL);
taosArrayDestroy(pRsp->blockData);
pRsp->blockData = NULL;
taosArrayDestroyP(pRsp->blockSchema, (FDelete)tDeleteSchemaWrapper);
pRsp->blockSchema = NULL;

View File

@ -118,7 +118,7 @@ int32_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, uint64_t
// tqExec
int32_t tqTaosxScanLog(STQ* pTq, STqHandle* pHandle, SPackedData submit, SMqDataRsp* pRsp, int32_t* totalRows, const SMqPollReq* pRequest);
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp,
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp,
int32_t type, int32_t vgId);
void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId);
@ -147,7 +147,7 @@ int32_t tqOffsetRestoreFromFile(STQ* pTq, char* name);
// tq util
int32_t tqExtractDelDataBlock(const void* pData, int32_t len, int64_t ver, void** pRefBlock, int32_t type, EStreamType blockType);
int32_t tqExtractDataForMq(STQ* pTq, STqHandle* pHandle, const SMqPollReq* pRequest, SRpcMsg* pMsg);
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever);
int32_t tqInitDataRsp(SMqDataRsp* pRsp, STqOffsetVal pOffset);
void tqUpdateNodeStage(STQ* pTq, bool isLeader);

View File

@ -208,7 +208,7 @@ void tqPushEmptyDataRsp(STqHandle* pHandle, int32_t vgId) {
tDeleteMqDataRsp(&dataRsp);
}
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, const SMqDataRsp* pRsp, int32_t type,
int32_t tqSendDataRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPollReq* pReq, SMqDataRsp* pRsp, int32_t type,
int32_t vgId) {
if (pHandle == NULL || pMsg == NULL || pReq == NULL || pRsp == NULL) {
return TSDB_CODE_INVALID_PARA;

View File

@ -263,7 +263,7 @@ int32_t tqScanTaosx(STQ* pTq, const STqHandle* pHandle, SMqDataRsp* pRsp, SMqBat
tbName = NULL;
}
if (pRsp->withSchema) {
SSchemaWrapper* pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
pSW = tCloneSSchemaWrapper(qExtractSchemaFromTask(task));
TSDB_CHECK_NULL(pSW, code, lino, END, terrno);
TSDB_CHECK_NULL(taosArrayPush(pRsp->blockSchema, &pSW), code, lino, END, terrno);
pSW = NULL;

View File

@ -533,7 +533,7 @@ int32_t tqSendMetaPollRsp(STqHandle* pHandle, const SRpcMsg* pMsg, const SMqPoll
return 0;
}
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, SMqDataRsp* pRsp, int32_t epoch, int64_t consumerId,
int32_t type, int64_t sver, int64_t ever) {
if (pRpcHandleInfo == NULL || pRsp == NULL) {
return TSDB_CODE_TMQ_INVALID_MSG;
@ -541,6 +541,9 @@ int32_t tqDoSendDataRsp(const SRpcHandleInfo* pRpcHandleInfo, const SMqDataRsp*
int32_t len = 0;
int32_t code = 0;
if (type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP){
pRsp->withSchema = 0;
}
if (type == TMQ_MSG_TYPE__POLL_DATA_RSP ||
type == TMQ_MSG_TYPE__WALINFO_RSP ||
type == TMQ_MSG_TYPE__POLL_RAW_DATA_RSP) {